/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hadoop.shaded.org.apache.zookeeper.server;

import io.hops.hadoop.shaded.org.apache.zookeeper.AsyncCallback;
import io.hops.hadoop.shaded.org.apache.zookeeper.CreateMode;
import io.hops.hadoop.shaded.org.apache.zookeeper.KeeperException;
import io.hops.hadoop.shaded.org.apache.zookeeper.PortAssignment;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZKTestCase;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZooDefs;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZooKeeper;
import io.hops.hadoop.shaded.org.apache.zookeeper.metrics.MetricsUtils;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.FinalRequestProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.PrepRequestProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.Request;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.RequestProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.RequestThrottler;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ServerCnxn;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ServerCnxnFactory;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ServerMetrics;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.SyncRequestProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ZooKeeperServer;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.ClientBase;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestThrottlerTest
extends ZKTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(RequestThrottlerTest.class);
    private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
    private static String GLOBAL_OUTSTANDING_LIMIT = "1";
    private static final int TOTAL_REQUESTS = 5;
    private static final int STALL_TIME = 5000;
    CountDownLatch resumeProcess = null;
    CountDownLatch submitted = null;
    CountDownLatch entered = null;
    CountDownLatch finished = null;
    CountDownLatch disconnected = null;
    ZooKeeperServer zks = null;
    ServerCnxnFactory f = null;
    ZooKeeper zk = null;
    int connectionLossCount = 0;

    @Before
    public void setup() throws Exception {
        File tmpDir = ClientBase.createTmpDir();
        ClientBase.setupTestEnv();
        this.zks = new TestZooKeeperServer(tmpDir, tmpDir, 3000);
        int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
        this.f = ServerCnxnFactory.createFactory(PORT, -1);
        this.f.startup(this.zks);
        LOG.info("starting up the zookeeper server .. waiting");
        Assert.assertTrue((String)"waiting for server being up", (boolean)ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT));
        this.resumeProcess = null;
        this.submitted = null;
        this.zk = ClientBase.createZKClient(HOSTPORT);
    }

    @After
    public void tearDown() throws Exception {
        if (null != this.zk) {
            this.zk.close();
        }
        if (null != this.f) {
            this.f.shutdown();
        }
        if (null != this.zks) {
            this.zks.shutdown();
        }
    }

    @Test
    public void testRequestThrottler() throws Exception {
        ServerMetrics.getMetrics().resetAll();
        RequestThrottler.setMaxRequests(2);
        RequestThrottler.setStallTime(5000);
        RequestThrottler.setDropStaleRequests(false);
        this.resumeProcess = new CountDownLatch(1);
        this.submitted = new CountDownLatch(5);
        this.entered = new CountDownLatch(5);
        for (int i = 0; i < 5; ++i) {
            this.zk.create("/request_throttle_test- " + i, ("/request_throttle_test- " + i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {}, null);
        }
        this.submitted.await(5L, TimeUnit.SECONDS);
        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
        Assert.assertEquals((long)2L, (long)((Long)metrics.get("prep_processor_request_queued")));
        Assert.assertEquals((long)1L, (long)((Long)metrics.get("request_throttle_wait_count")));
        this.resumeProcess.countDown();
        this.entered.await(5000L, TimeUnit.MILLISECONDS);
        metrics = MetricsUtils.currentServerMetrics();
        Assert.assertEquals((long)5L, (long)((Long)metrics.get("prep_processor_request_queued")));
    }

    @Test
    public void testDropStaleRequests() throws Exception {
        ServerMetrics.getMetrics().resetAll();
        RequestThrottler.setMaxRequests(2);
        RequestThrottler.setStallTime(5000);
        RequestThrottler.setDropStaleRequests(true);
        this.resumeProcess = new CountDownLatch(1);
        this.submitted = new CountDownLatch(5);
        for (int i = 0; i < 5; ++i) {
            this.zk.create("/request_throttle_test- " + i, ("/request_throttle_test- " + i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {}, null);
        }
        this.submitted.await(5L, TimeUnit.SECONDS);
        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
        Assert.assertEquals((long)2L, (long)((Long)metrics.get("prep_processor_request_queued")));
        Assert.assertEquals((long)1L, (long)((Long)metrics.get("request_throttle_wait_count")));
        for (ServerCnxn cnxn : this.f.cnxns) {
            cnxn.setStale();
        }
        this.zk = null;
        this.resumeProcess.countDown();
        LOG.info("raise the latch");
        while (this.zks.getInflight() > 0) {
            Thread.sleep(50L);
        }
        metrics = MetricsUtils.currentServerMetrics();
        Assert.assertEquals((long)2L, (long)((Long)metrics.get("prep_processor_request_queued")));
        Assert.assertEquals((long)1L, (long)((Long)metrics.get("stale_requests_dropped")));
    }

    @Test
    public void testLargeRequestThrottling() throws Exception {
        ServerMetrics.getMetrics().resetAll();
        AsyncCallback.StringCallback createCallback = (rc, path, ctx, name) -> {
            if (KeeperException.Code.get(rc) == KeeperException.Code.CONNECTIONLOSS) {
                this.disconnected.countDown();
                ++this.connectionLossCount;
            }
        };
        RequestThrottler.setMaxRequests(5);
        this.zks.setLargeRequestThreshold(150);
        this.zks.setLargeRequestMaxBytes(400);
        this.resumeProcess = new CountDownLatch(1);
        this.disconnected = new CountDownLatch(5);
        byte[] data = new byte[100];
        for (int i = 0; i < 5; ++i) {
            this.zk.create("/request_throttle_test- " + i, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createCallback, null);
        }
        this.disconnected.await(5L, TimeUnit.SECONDS);
        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
        Assert.assertEquals((long)2L, (long)((Long)metrics.get("prep_processor_request_queued")));
        Assert.assertEquals((long)1L, (long)((Long)metrics.get("large_requests_rejected")));
        Assert.assertEquals((long)5L, (long)this.connectionLossCount);
        this.finished = new CountDownLatch(2);
        this.resumeProcess.countDown();
        this.finished.await(5L, TimeUnit.SECONDS);
        metrics = MetricsUtils.currentServerMetrics();
        Assert.assertEquals((long)2L, (long)((Long)metrics.get("stale_replies")));
    }

    @Test
    public void testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled() throws Exception {
        try {
            System.setProperty("zookeeper.globalOutstandingLimit", GLOBAL_OUTSTANDING_LIMIT);
            ServerMetrics.getMetrics().resetAll();
            RequestThrottler.setMaxRequests(0);
            this.resumeProcess = new CountDownLatch(1);
            int totalRequests = 10;
            this.submitted = new CountDownLatch(totalRequests);
            for (int i = 0; i < totalRequests; ++i) {
                this.zk.create("/request_throttle_test- " + i, ("/request_throttle_test- " + i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {}, null);
            }
            this.submitted.await(5L, TimeUnit.SECONDS);
            this.resumeProcess.countDown();
            Assert.assertEquals((long)(Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2), (long)((Long)MetricsUtils.currentServerMetrics().get("prep_processor_request_queued")));
        }
        catch (Exception e) {
            throw e;
        }
        finally {
            System.clearProperty("zookeeper.globalOutstandingLimit");
        }
    }

    class TestPrepRequestProcessor
    extends PrepRequestProcessor {
        public TestPrepRequestProcessor(ZooKeeperServer zks, RequestProcessor syncProcessor) {
            super(zks, syncProcessor);
        }

        @Override
        protected void pRequest(Request request) throws RequestProcessor.RequestProcessorException {
            if (RequestThrottlerTest.this.resumeProcess != null) {
                try {
                    RequestThrottlerTest.this.resumeProcess.await(20L, TimeUnit.SECONDS);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            if (RequestThrottlerTest.this.entered != null) {
                RequestThrottlerTest.this.entered.countDown();
            }
            super.pRequest(request);
        }
    }

    class TestZooKeeperServer
    extends ZooKeeperServer {
        public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
            super(snapDir, logDir, tickTime);
        }

        @Override
        protected void setupRequestProcessors() {
            FinalRequestProcessor finalProcessor = new FinalRequestProcessor(this);
            SyncRequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
            syncProcessor.start();
            this.firstProcessor = new TestPrepRequestProcessor(this, syncProcessor);
            ((TestPrepRequestProcessor)this.firstProcessor).start();
        }

        @Override
        public void submitRequest(Request si) {
            if (null != RequestThrottlerTest.this.submitted) {
                RequestThrottlerTest.this.submitted.countDown();
            }
            super.submitRequest(si);
        }

        @Override
        public void requestFinished(Request request) {
            if (null != RequestThrottlerTest.this.finished) {
                RequestThrottlerTest.this.finished.countDown();
            }
            super.requestFinished(request);
        }
    }
}

