package org.apache.flink.runtime.io.network.netty;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.util.NetUtils;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.class */
public class NettyPartitionRequestClientTest {
    @Test
    public void testRetriggerPartitionRequest() throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        ChannelHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{creditBasedPartitionRequestClientHandler});
        NettyPartitionRequestClient createPartitionRequestClient = createPartitionRequestClient(embeddedChannel, creditBasedPartitionRequestClientHandler);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        InputChannel buildRemoteChannel = InputChannelBuilder.newBuilder().setConnectionManager(InputChannelTestUtils.mockConnectionManagerWithPartitionRequestClient(createPartitionRequestClient)).setInitialBackoff(1).setMaxBackoff(2).buildRemoteChannel(createSingleInputGate);
        try {
            createSingleInputGate.setInputChannels(new InputChannel[]{buildRemoteChannel});
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(6, 6));
            createSingleInputGate.setupChannels();
            buildRemoteChannel.requestSubpartition(0);
            Assert.assertTrue(embeddedChannel.isWritable());
            Object readOutbound = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound, Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(buildRemoteChannel.getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound).credit);
            createSingleInputGate.retriggerPartitionRequest(buildRemoteChannel.getPartitionId().getPartitionId());
            runAllScheduledPendingTasks(embeddedChannel, currentTimeMillis);
            Object readOutbound2 = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound2, Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(buildRemoteChannel.getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound2).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound2).credit);
            createSingleInputGate.retriggerPartitionRequest(buildRemoteChannel.getPartitionId().getPartitionId());
            runAllScheduledPendingTasks(embeddedChannel, currentTimeMillis);
            Object readOutbound3 = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound3, Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(buildRemoteChannel.getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound3).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound3).credit);
            Assert.assertNull(embeddedChannel.readOutbound());
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testDoublePartitionRequest() throws Exception {
        ChannelHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{creditBasedPartitionRequestClientHandler});
        NettyPartitionRequestClient createPartitionRequestClient = createPartitionRequestClient(embeddedChannel, creditBasedPartitionRequestClientHandler);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        InputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, (PartitionRequestClient) createPartitionRequestClient);
        try {
            createSingleInputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel});
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(6, 6));
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            Assert.assertTrue(embeddedChannel.isWritable());
            Object readOutbound = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound, Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(createRemoteInputChannel.getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound).credit);
            Assert.assertNull(embeddedChannel.readOutbound());
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testResumeConsumption() throws Exception {
        ChannelHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{creditBasedPartitionRequestClientHandler});
        NettyPartitionRequestClient createPartitionRequestClient = createPartitionRequestClient(embeddedChannel, creditBasedPartitionRequestClientHandler);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        RemoteInputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, (PartitionRequestClient) createPartitionRequestClient);
        try {
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(6, 6));
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            createRemoteInputChannel.resumeConsumption();
            embeddedChannel.runPendingTasks();
            Assert.assertThat(embeddedChannel.readOutbound(), Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Object readOutbound = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound, Matchers.instanceOf(NettyMessage.ResumeConsumption.class));
            Assert.assertEquals(createRemoteInputChannel.getInputChannelId(), ((NettyMessage.ResumeConsumption) readOutbound).receiverId);
            Assert.assertNull(embeddedChannel.readOutbound());
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testAcknowledgeAllRecordsProcessed() throws Exception {
        ChannelHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{creditBasedPartitionRequestClientHandler});
        NettyPartitionRequestClient createPartitionRequestClient = createPartitionRequestClient(embeddedChannel, creditBasedPartitionRequestClientHandler);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        RemoteInputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, (PartitionRequestClient) createPartitionRequestClient);
        try {
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(6, 6));
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            createRemoteInputChannel.acknowledgeAllRecordsProcessed();
            embeddedChannel.runPendingTasks();
            Assert.assertThat(embeddedChannel.readOutbound(), Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Object readOutbound = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound, Matchers.instanceOf(NettyMessage.AckAllUserRecordsProcessed.class));
            Assert.assertEquals(createRemoteInputChannel.getInputChannelId(), ((NettyMessage.AckAllUserRecordsProcessed) readOutbound).receiverId);
            Assert.assertNull(embeddedChannel.readOutbound());
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
            throw th;
        }
    }

    private NettyPartitionRequestClient createPartitionRequestClient(Channel channel, NetworkClientHandler networkClientHandler) throws Exception {
        int availablePort = NetUtils.getAvailablePort();
        return new NettyPartitionRequestClient(channel, networkClientHandler, new ConnectionID(new InetSocketAddress("localhost", availablePort), 0), new PartitionRequestClientFactory(new NettyClient(new NettyConfig(InetAddress.getLocalHost(), availablePort, 1024, 1, new Configuration()))));
    }

    void runAllScheduledPendingTasks(EmbeddedChannel embeddedChannel, long j) throws InterruptedException {
        while (embeddedChannel.runScheduledPendingTasks() != -1 && System.currentTimeMillis() < j) {
            Thread.sleep(1L);
        }
    }
}
