package org.apache.flink.runtime.rpc.akka;

import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.FunctionWithException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.class */
class AkkaRpcActorOversizedResponseMessageTest {
    private static final int FRAMESIZE = 32000;
    private static final String OVERSIZED_PAYLOAD = new String(new byte[FRAMESIZE]);
    private static final String PAYLOAD = "Hello";
    private static RpcService rpcService1;
    private static RpcService rpcService2;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest$MessageRpcEndpoint.class */
    public static class MessageRpcEndpoint extends RpcEndpoint implements MessageRpcGateway {

        @Nonnull
        private final String message;

        MessageRpcEndpoint(RpcService rpcService, @Nonnull String str) {
            super(rpcService);
            this.message = str;
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorOversizedResponseMessageTest.MessageRpcGateway
        public CompletableFuture<String> messageAsync() {
            return CompletableFuture.completedFuture(this.message);
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorOversizedResponseMessageTest.MessageRpcGateway
        public String messageSync() throws RpcException {
            return this.message;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest$MessageRpcGateway.class */
    public interface MessageRpcGateway extends RpcGateway {
        CompletableFuture<String> messageAsync();

        String messageSync() throws RpcException;
    }

    AkkaRpcActorOversizedResponseMessageTest() {
    }

    @BeforeAll
    static void setupClass() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(AkkaOptions.FORCE_RPC_INVOCATION_SERIALIZATION, false);
        configuration.setString(AkkaOptions.FRAMESIZE, "32000 b");
        rpcService1 = AkkaRpcServiceUtils.remoteServiceBuilder(configuration, "localhost", 0).createAndStart();
        rpcService2 = AkkaRpcServiceUtils.remoteServiceBuilder(configuration, "localhost", 0).createAndStart();
    }

    @AfterAll
    static void teardownClass() throws Exception {
        RpcUtils.terminateRpcService(new RpcService[]{rpcService1, rpcService2});
    }

    @Test
    void testOverSizedResponseMsgAsync() throws Exception {
        Assertions.assertThatThrownBy(() -> {
        }).hasCauseInstanceOf(AkkaRpcException.class).extracting(ExceptionUtils::stripExecutionException).isInstanceOf(AkkaRpcException.class).extracting((v0) -> {
            return v0.getMessage();
        }).satisfies(new ThrowingConsumer[]{str -> {
            Assertions.assertThat(str).contains(new CharSequence[]{String.valueOf(FRAMESIZE)});
        }});
    }

    @Test
    void testNormalSizedResponseMsgAsync() throws Exception {
        Assertions.assertThat((String) runRemoteMessageResponseTest(PAYLOAD, this::requestMessageAsync)).isEqualTo(PAYLOAD);
    }

    @Test
    void testNormalSizedResponseMsgSync() throws Exception {
        Assertions.assertThat((String) runRemoteMessageResponseTest(PAYLOAD, (v0) -> {
            return v0.messageSync();
        })).isEqualTo(PAYLOAD);
    }

    @Test
    void testOverSizedResponseMsgSync() throws Exception {
        Assertions.assertThatThrownBy(() -> {
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(AkkaRpcException.class, String.valueOf(FRAMESIZE))});
    }

    @Test
    void testLocalOverSizedResponseMsgSync() throws Exception {
        Assertions.assertThat((String) runLocalMessageResponseTest(OVERSIZED_PAYLOAD, (v0) -> {
            return v0.messageSync();
        })).isEqualTo(OVERSIZED_PAYLOAD);
    }

    @Test
    void testLocalOverSizedResponseMsgAsync() throws Exception {
        Assertions.assertThat((String) runLocalMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync)).isEqualTo(OVERSIZED_PAYLOAD);
    }

    private String requestMessageAsync(MessageRpcGateway messageRpcGateway) throws Exception {
        return messageRpcGateway.messageAsync().get();
    }

    private <T> T runRemoteMessageResponseTest(String str, FunctionWithException<MessageRpcGateway, T, Exception> functionWithException) throws Exception {
        MessageRpcEndpoint messageRpcEndpoint = new MessageRpcEndpoint(rpcService1, str);
        try {
            messageRpcEndpoint.start();
            T t = (T) functionWithException.apply((MessageRpcGateway) rpcService2.connect(messageRpcEndpoint.getAddress(), MessageRpcGateway.class).get());
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{messageRpcEndpoint});
            return t;
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{messageRpcEndpoint});
            throw th;
        }
    }

    private <T> T runLocalMessageResponseTest(String str, FunctionWithException<MessageRpcGateway, T, Exception> functionWithException) throws Exception {
        MessageRpcEndpoint messageRpcEndpoint = new MessageRpcEndpoint(rpcService1, str);
        try {
            messageRpcEndpoint.start();
            T t = (T) functionWithException.apply((MessageRpcGateway) rpcService1.connect(messageRpcEndpoint.getAddress(), MessageRpcGateway.class).get());
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{messageRpcEndpoint});
            return t;
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{messageRpcEndpoint});
            throw th;
        }
    }
}
