package io.hops.hadoop.shaded.org.apache.zookeeper.server;

import io.hops.hadoop.shaded.io.netty.channel.Channel;
import io.hops.hadoop.shaded.io.netty.channel.ChannelFuture;
import io.hops.hadoop.shaded.io.netty.channel.ChannelHandlerContext;
import io.hops.hadoop.shaded.io.netty.channel.ChannelId;
import io.hops.hadoop.shaded.io.netty.channel.ChannelPipeline;
import io.hops.hadoop.shaded.io.netty.util.Attribute;
import io.hops.hadoop.shaded.io.netty.util.AttributeKey;
import io.hops.hadoop.shaded.org.apache.zookeeper.AsyncCallback;
import io.hops.hadoop.shaded.org.apache.zookeeper.CreateMode;
import io.hops.hadoop.shaded.org.apache.zookeeper.KeeperException;
import io.hops.hadoop.shaded.org.apache.zookeeper.TestableZooKeeper;
import io.hops.hadoop.shaded.org.apache.zookeeper.Watcher;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZooDefs;
import io.hops.hadoop.shaded.org.apache.zookeeper.common.ClientX509Util;
import io.hops.hadoop.shaded.org.apache.zookeeper.data.Stat;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ServerStats;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.BufferStats;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.ClientBase;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.SSLAuthTest;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.TestByteBufAllocator;
import io.hops.hadoop.shaded.org.hamcrest.Matchers;
import io.hops.hadoop.shaded.org.mockito.Mockito;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.cli.CLITestHelper;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hops/hadoop/shaded/org/apache/zookeeper/server/NettyServerCnxnTest.class */
public class NettyServerCnxnTest extends ClientBase {
    private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxnTest.class);

    @Override // io.hops.hadoop.shaded.org.apache.zookeeper.test.ClientBase
    public void setUp() throws Exception {
        System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, "io.hops.hadoop.shaded.org.apache.zookeeper.server.NettyServerCnxnFactory");
        NettyServerCnxnFactory.setTestAllocator(TestByteBufAllocator.getInstance());
        this.maxCnxns = 1;
        this.exceptionOnFailedConnect = true;
        super.setUp();
    }

    @Override // io.hops.hadoop.shaded.org.apache.zookeeper.test.ClientBase
    public void tearDown() throws Exception {
        super.tearDown();
        NettyServerCnxnFactory.clearTestAllocator();
        TestByteBufAllocator.checkForLeaks();
    }

    @Test(timeout = 40000)
    public void testSendCloseSession() throws Exception {
        Assert.assertTrue("Didn't instantiate ServerCnxnFactory with NettyServerCnxnFactory!", this.serverFactory instanceof NettyServerCnxnFactory);
        TestableZooKeeper createClient = createClient();
        ZooKeeperServer zooKeeperServer = this.serverFactory.getZooKeeperServer();
        try {
            createClient.create("/a", CLITestHelper.TESTMODE_TEST.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.assertNotNull("Didn't create znode:/a", createClient.exists("/a", true));
            Assert.assertEquals(1L, zooKeeperServer.getZKDatabase().getDataTree().getWatchCount());
            Iterable<ServerCnxn> connections = this.serverFactory.getConnections();
            Assert.assertEquals("Mismatch in number of live connections!", 1L, this.serverFactory.getNumAliveConnections());
            Iterator<ServerCnxn> it = connections.iterator();
            while (it.hasNext()) {
                it.next().sendCloseSession();
            }
            LOG.info("Waiting for the channel disconnected event");
            int i = 0;
            while (this.serverFactory.getNumAliveConnections() != 0) {
                Thread.sleep(1000L);
                i += 1000;
                if (i > CONNECTION_TIMEOUT) {
                    Assert.fail("The number of live connections should be 0");
                }
            }
            Assert.assertEquals(0L, zooKeeperServer.getZKDatabase().getDataTree().getWatchCount());
            createClient.close();
        } catch (Throwable th) {
            createClient.close();
            throw th;
        }
    }

    @Test(timeout = 40000, expected = ProtocolException.class)
    public void testMaxConnectionPerIpSurpased() throws Exception {
        Assert.assertTrue("Did not instantiate ServerCnxnFactory with NettyServerCnxnFactory!", this.serverFactory instanceof NettyServerCnxnFactory);
        TestableZooKeeper createClient = createClient();
        Throwable th = null;
        try {
            TestableZooKeeper createClient2 = createClient();
            Throwable th2 = null;
            if (createClient2 != null) {
                if (0 != 0) {
                    try {
                        createClient2.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                } else {
                    createClient2.close();
                }
            }
            if (createClient != null) {
                if (0 == 0) {
                    createClient.close();
                    return;
                }
                try {
                    createClient.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            }
        } catch (Throwable th5) {
            if (createClient != null) {
                if (0 != 0) {
                    try {
                        createClient.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th5;
        }
    }

    @Test
    public void testClientResponseStatsUpdate() throws IOException, InterruptedException, KeeperException {
        TestableZooKeeper createClient = createClient();
        Throwable th = null;
        try {
            BufferStats clientResponseStats = this.serverFactory.getZooKeeperServer().serverStats().getClientResponseStats();
            Assert.assertThat("Last client response size should be initialized with INIT_VALUE", Integer.valueOf(clientResponseStats.getLastBufferSize()), Matchers.equalTo(-1));
            createClient.create("/a", CLITestHelper.TESTMODE_TEST.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.assertThat("Last client response size should be greater than 0 after client request was performed", Integer.valueOf(clientResponseStats.getLastBufferSize()), Matchers.greaterThan(0));
            Assert.assertArrayEquals("unexpected data", CLITestHelper.TESTMODE_TEST.getBytes(StandardCharsets.UTF_8), createClient.getData("/a", null, null));
            if (createClient != null) {
                if (0 == 0) {
                    createClient.close();
                    return;
                }
                try {
                    createClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (0 != 0) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testNonMTLSLocalConn() throws IOException, InterruptedException, KeeperException {
        TestableZooKeeper createClient = createClient();
        Throwable th = null;
        try {
            ServerStats serverStats = this.serverFactory.getZooKeeperServer().serverStats();
            Assert.assertEquals(2L, serverStats.getNonMTLSLocalConnCount());
            Assert.assertEquals(0L, serverStats.getNonMTLSRemoteConnCount());
            if (createClient != null) {
                if (0 == 0) {
                    createClient.close();
                    return;
                }
                try {
                    createClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (0 != 0) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testNonMTLSRemoteConn() throws Exception {
        Channel channel = (Channel) Mockito.mock(Channel.class);
        ChannelId channelId = (ChannelId) Mockito.mock(ChannelId.class);
        ChannelFuture channelFuture = (ChannelFuture) Mockito.mock(ChannelFuture.class);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        ChannelPipeline channelPipeline = (ChannelPipeline) Mockito.mock(ChannelPipeline.class);
        Mockito.when(channelHandlerContext.channel()).thenReturn(channel);
        Mockito.when(channel.pipeline()).thenReturn(channelPipeline);
        Mockito.when(channelFuture.channel()).thenReturn(channel);
        Mockito.when(channel.closeFuture()).thenReturn(channelFuture);
        Mockito.when(channel.remoteAddress()).thenReturn(new InetSocketAddress(0));
        Mockito.when(channel.id()).thenReturn(channelId);
        NettyServerCnxnFactory nettyServerCnxnFactory = new NettyServerCnxnFactory();
        LeaderZooKeeperServer leaderZooKeeperServer = (LeaderZooKeeperServer) Mockito.mock(LeaderZooKeeperServer.class);
        nettyServerCnxnFactory.setZooKeeperServer(leaderZooKeeperServer);
        Attribute attribute = (Attribute) Mockito.mock(Attribute.class);
        ((Channel) Mockito.doReturn(attribute).when(channel)).attr((AttributeKey) Mockito.any());
        ((Attribute) Mockito.doNothing().when(attribute)).set(Mockito.any());
        Mockito.when(leaderZooKeeperServer.serverStats()).thenReturn(new ServerStats((ServerStats.Provider) Mockito.mock(ServerStats.Provider.class)));
        nettyServerCnxnFactory.channelHandler.channelActive(channelHandlerContext);
        Assert.assertEquals(0L, leaderZooKeeperServer.serverStats().getNonMTLSLocalConnCount());
        Assert.assertEquals(1L, leaderZooKeeperServer.serverStats().getNonMTLSRemoteConnCount());
    }

    @Test
    public void testServerSideThrottling() throws IOException, InterruptedException, KeeperException {
        TestableZooKeeper createClient = createClient();
        Throwable th = null;
        try {
            BufferStats clientResponseStats = this.serverFactory.getZooKeeperServer().serverStats().getClientResponseStats();
            Assert.assertThat("Last client response size should be initialized with INIT_VALUE", Integer.valueOf(clientResponseStats.getLastBufferSize()), Matchers.equalTo(-1));
            createClient.create("/a", CLITestHelper.TESTMODE_TEST.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.assertThat("Last client response size should be greater than 0 after client request was performed", Integer.valueOf(clientResponseStats.getLastBufferSize()), Matchers.greaterThan(0));
            Iterator<ServerCnxn> it = this.serverFactory.cnxns.iterator();
            while (it.hasNext()) {
                final NettyServerCnxn nettyServerCnxn = (NettyServerCnxn) it.next();
                nettyServerCnxn.disableRecv();
                nettyServerCnxn.getChannel().eventLoop().schedule(new Runnable() { // from class: io.hops.hadoop.shaded.org.apache.zookeeper.server.NettyServerCnxnTest.1
                    @Override // java.lang.Runnable
                    public void run() {
                        nettyServerCnxn.getChannel().read();
                    }
                }, 1L, TimeUnit.SECONDS);
                nettyServerCnxn.getChannel().eventLoop().schedule(new Runnable() { // from class: io.hops.hadoop.shaded.org.apache.zookeeper.server.NettyServerCnxnTest.2
                    @Override // java.lang.Runnable
                    public void run() {
                        nettyServerCnxn.enableRecv();
                    }
                }, 2L, TimeUnit.SECONDS);
            }
            Assert.assertArrayEquals("unexpected data", CLITestHelper.TESTMODE_TEST.getBytes(StandardCharsets.UTF_8), createClient.getData("/a", null, null));
            Iterator<ServerCnxn> it2 = this.serverFactory.cnxns.iterator();
            while (it2.hasNext()) {
                final NettyServerCnxn nettyServerCnxn2 = (NettyServerCnxn) it2.next();
                nettyServerCnxn2.disableRecv();
                nettyServerCnxn2.getChannel().eventLoop().schedule(new Runnable() { // from class: io.hops.hadoop.shaded.org.apache.zookeeper.server.NettyServerCnxnTest.3
                    @Override // java.lang.Runnable
                    public void run() {
                        nettyServerCnxn2.enableRecv();
                    }
                }, 2L, TimeUnit.SECONDS);
            }
            Assert.assertArrayEquals("unexpected data", CLITestHelper.TESTMODE_TEST.getBytes(StandardCharsets.UTF_8), createClient.getData("/a", null, null));
            if (createClient != null) {
                if (0 == 0) {
                    createClient.close();
                    return;
                }
                try {
                    createClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (0 != 0) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testEnableDisableThrottling_secure_random() throws Exception {
        runEnableDisableThrottling(true, true);
    }

    @Test
    public void testEnableDisableThrottling_secure_sequentially() throws Exception {
        runEnableDisableThrottling(true, false);
    }

    @Test
    public void testEnableDisableThrottling_nonSecure_random() throws Exception {
        runEnableDisableThrottling(false, true);
    }

    @Test
    public void testEnableDisableThrottling_nonSecure_sequentially() throws Exception {
        runEnableDisableThrottling(false, false);
    }

    private void runEnableDisableThrottling(boolean z, boolean z2) throws Exception {
        ClientX509Util clientX509Util = null;
        if (z) {
            clientX509Util = SSLAuthTest.setUpSecure();
        }
        try {
            NettyServerCnxnFactory nettyServerCnxnFactory = (NettyServerCnxnFactory) this.serverFactory;
            nettyServerCnxnFactory.setAdvancedFlowControlEnabled(true);
            if (z) {
                nettyServerCnxnFactory.setSecure(true);
            }
            final TestableZooKeeper createClient = createClient();
            Throwable th = null;
            try {
                try {
                    createClient.create("/testEnableDisableThrottling", new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                    final Random random = new Random();
                    Thread thread = z2 ? new Thread() { // from class: io.hops.hadoop.shaded.org.apache.zookeeper.server.NettyServerCnxnTest.4
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            while (!atomicBoolean.get()) {
                                for (ServerCnxn serverCnxn : NettyServerCnxnTest.this.serverFactory.cnxns) {
                                    if (random.nextBoolean()) {
                                        serverCnxn.disableRecv();
                                    } else {
                                        serverCnxn.enableRecv();
                                    }
                                }
                                try {
                                    Thread.sleep(10L);
                                } catch (InterruptedException e) {
                                }
                            }
                            Iterator<ServerCnxn> it = NettyServerCnxnTest.this.serverFactory.cnxns.iterator();
                            while (it.hasNext()) {
                                it.next().enableRecv();
                            }
                        }
                    } : new Thread() { // from class: io.hops.hadoop.shaded.org.apache.zookeeper.server.NettyServerCnxnTest.5
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            while (!atomicBoolean.get()) {
                                for (ServerCnxn serverCnxn : NettyServerCnxnTest.this.serverFactory.cnxns) {
                                    try {
                                        serverCnxn.disableRecv();
                                        Thread.sleep(10L);
                                        serverCnxn.enableRecv();
                                        Thread.sleep(10L);
                                    } catch (InterruptedException e) {
                                    }
                                }
                            }
                        }
                    };
                    thread.start();
                    LOG.info("started thread to enable and disable recv");
                    final int i = 100000;
                    final AtomicInteger atomicInteger = new AtomicInteger();
                    final CountDownLatch countDownLatch = new CountDownLatch(100000);
                    new Thread() { // from class: io.hops.hadoop.shaded.org.apache.zookeeper.server.NettyServerCnxnTest.6
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            int i2 = 0;
                            while (true) {
                                int i3 = i2;
                                i2++;
                                if (i3 >= i) {
                                    return;
                                } else {
                                    createClient.getData("/testEnableDisableThrottling", (Watcher) null, new AsyncCallback.DataCallback() { // from class: io.hops.hadoop.shaded.org.apache.zookeeper.server.NettyServerCnxnTest.6.1
                                        @Override // io.hops.hadoop.shaded.org.apache.zookeeper.AsyncCallback.DataCallback
                                        public void processResult(int i4, String str, Object obj, byte[] bArr, Stat stat) {
                                            if (i4 == 0) {
                                                atomicInteger.addAndGet(1);
                                            } else {
                                                NettyServerCnxnTest.LOG.info("failed response is {}", Integer.valueOf(i4));
                                            }
                                            countDownLatch.countDown();
                                        }
                                    }, (Object) null);
                                }
                            }
                        }
                    }.start();
                    LOG.info("started thread to issue {} async requests", 100000);
                    Assert.assertTrue(countDownLatch.await(60L, TimeUnit.SECONDS));
                    LOG.info("received all {} responses", 100000);
                    atomicBoolean.set(true);
                    thread.join();
                    LOG.info("enable and disable recv thread exited");
                    LOG.info("waiting another 1s for the requests to go through");
                    Thread.sleep(1000L);
                    Assert.assertEquals(atomicInteger.get(), 100000);
                    if (createClient != null) {
                        if (0 != 0) {
                            try {
                                createClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createClient.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } finally {
            if (z) {
                SSLAuthTest.clearSecureSetting(clientX509Util);
            }
        }
    }
}
