/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum;

import io.hops.hadoop.shaded.org.apache.jute.BinaryOutputArchive;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZKTestCase;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZooDefs;
import io.hops.hadoop.shaded.org.apache.zookeeper.data.Id;
import io.hops.hadoop.shaded.org.apache.zookeeper.proto.CreateRequest;
import io.hops.hadoop.shaded.org.apache.zookeeper.proto.GetDataRequest;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.FinalRequestProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.PrepRequestProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.Request;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.RequestProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ZooKeeperServer;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.CommitProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.ClientBase;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitProcessorTest
extends ZKTestCase {
    protected static final Logger LOG = LoggerFactory.getLogger(CommitProcessorTest.class);
    static final int TEST_RUN_TIME_IN_MS = 5000;
    private AtomicInteger processedReadRequests = new AtomicInteger(0);
    private AtomicInteger processedWriteRequests = new AtomicInteger(0);
    boolean stopped;
    TestZooKeeperServer zks;
    File tmpDir;
    ArrayList<TestClientThread> testClients = new ArrayList();
    CommitProcessor commitProcessor;
    volatile boolean fail = false;

    public void setUp(int numCommitThreads, int numClientThreads, int writePercent) throws Exception {
        this.stopped = false;
        System.setProperty("zookeeper.commitProcessor.numWorkerThreads", Integer.toString(numCommitThreads));
        this.tmpDir = ClientBase.createTmpDir();
        ClientBase.setupTestEnv();
        this.zks = new TestZooKeeperServer(this.tmpDir, this.tmpDir, 4000);
        this.zks.startup();
        for (int i = 0; i < numClientThreads; ++i) {
            TestClientThread client = new TestClientThread(writePercent);
            this.testClients.add(client);
            client.start();
        }
    }

    public void setUp(int numCommitThreads, int numReadOnlyClientThreads, int mixWorkloadClientThreads, int writePercent) throws Exception {
        TestClientThread client;
        int i;
        this.stopped = false;
        System.setProperty("zookeeper.commitProcessor.numWorkerThreads", Integer.toString(numCommitThreads));
        this.tmpDir = ClientBase.createTmpDir();
        ClientBase.setupTestEnv();
        this.zks = new TestZooKeeperServer(this.tmpDir, this.tmpDir, 4000);
        this.zks.startup();
        for (i = 0; i < mixWorkloadClientThreads; ++i) {
            client = new TestClientThread(writePercent);
            this.testClients.add(client);
            client.start();
        }
        for (i = 0; i < numReadOnlyClientThreads; ++i) {
            client = new TestClientThread(0);
            this.testClients.add(client);
            client.start();
        }
    }

    @After
    public void tearDown() throws Exception {
        LOG.info("tearDown starting");
        this.stopped = true;
        this.zks.shutdown();
        for (TestClientThread client : this.testClients) {
            client.interrupt();
            client.join();
        }
        if (this.tmpDir != null) {
            Assert.assertTrue((String)("delete " + this.tmpDir.toString()), (boolean)ClientBase.recursiveDelete(this.tmpDir));
        }
        this.processedReadRequests.set(0);
        this.processedWriteRequests.set(0);
        this.testClients.clear();
        this.commitProcessor.join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNoCommitWorkersReadOnlyWorkload() throws Exception {
        int numClients = 10;
        LOG.info("testNoCommitWorkersReadOnlyWorkload");
        this.setUp(0, numClients, 0);
        CommitProcessorTest commitProcessorTest = this;
        synchronized (commitProcessorTest) {
            this.wait(5000L);
        }
        Assert.assertFalse((boolean)this.fail);
        Assert.assertTrue((String)"No read requests processed", (this.processedReadRequests.get() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((String)"Write requests processed", (this.processedWriteRequests.get() == numClients ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNoCommitWorkersMixedWorkload() throws Exception {
        int numClients = 10;
        LOG.info("testNoCommitWorkersMixedWorkload 25w/75r workload test");
        this.setUp(0, numClients, 25);
        CommitProcessorTest commitProcessorTest = this;
        synchronized (commitProcessorTest) {
            this.wait(5000L);
        }
        Assert.assertFalse((boolean)this.fail);
        this.checkProcessedRequest();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOneCommitWorkerReadOnlyWorkload() throws Exception {
        int numClients = 10;
        LOG.info("testOneCommitWorkerReadOnlyWorkload");
        this.setUp(1, numClients, 0);
        CommitProcessorTest commitProcessorTest = this;
        synchronized (commitProcessorTest) {
            this.wait(5000L);
        }
        Assert.assertFalse((boolean)this.fail);
        Assert.assertTrue((String)"No read requests processed", (this.processedReadRequests.get() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((String)"Write requests processed", (this.processedWriteRequests.get() == numClients ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOneCommitWorkerMixedWorkload() throws Exception {
        this.setUp(1, 10, 25);
        LOG.info("testOneCommitWorkerMixedWorkload 25w/75r workload test");
        CommitProcessorTest commitProcessorTest = this;
        synchronized (commitProcessorTest) {
            this.wait(5000L);
        }
        Assert.assertFalse((boolean)this.fail);
        this.checkProcessedRequest();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testManyCommitWorkersReadOnly() throws Exception {
        int numClients = 10;
        LOG.info("testManyCommitWorkersReadOnly");
        this.setUp(10, numClients, 0);
        CommitProcessorTest commitProcessorTest = this;
        synchronized (commitProcessorTest) {
            this.wait(5000L);
        }
        Assert.assertFalse((boolean)this.fail);
        Assert.assertTrue((String)"No read requests processed", (this.processedReadRequests.get() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((String)"Write requests processed", (this.processedWriteRequests.get() == numClients ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testManyCommitWorkersMixedWorkload() throws Exception {
        this.setUp(16, 8, 8, 25);
        LOG.info("testManyCommitWorkersMixedWorkload 8X0w/100r + 8X25w/75r workload test");
        CommitProcessorTest commitProcessorTest = this;
        synchronized (commitProcessorTest) {
            this.wait(5000L);
        }
        Assert.assertFalse((boolean)this.fail);
        this.checkProcessedRequest();
    }

    private void checkProcessedRequest() {
        Assert.assertTrue((String)"No read requests processed", (this.processedReadRequests.get() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((String)"No write requests processed", (this.processedWriteRequests.get() > 0 ? 1 : 0) != 0);
    }

    private synchronized void failTest(String reason) {
        this.fail = true;
        this.notifyAll();
        Assert.fail((String)reason);
    }

    private class ValidateProcessor
    implements RequestProcessor {
        Random rand = new Random(Thread.currentThread().getId());
        RequestProcessor nextProcessor;
        CommitProcessor commitProcessor;
        AtomicLong expectedZxid = new AtomicLong(1L);
        ConcurrentHashMap<Long, AtomicInteger> cxidMap = new ConcurrentHashMap();
        AtomicInteger outstandingReadRequests = new AtomicInteger(0);
        AtomicInteger outstandingWriteRequests = new AtomicInteger(0);

        public ValidateProcessor(RequestProcessor nextProcessor) {
            this.nextProcessor = nextProcessor;
        }

        public void setCommitProcessor(CommitProcessor commitProcessor) {
            this.commitProcessor = commitProcessor;
        }

        @Override
        public void processRequest(Request request) throws RequestProcessor.RequestProcessorException {
            if (CommitProcessorTest.this.stopped) {
                return;
            }
            if (request.type == -11) {
                LOG.debug("ValidateProcessor got closeSession request=" + request);
                this.nextProcessor.processRequest(request);
                return;
            }
            boolean isWriteRequest = this.commitProcessor.needCommit(request);
            if (isWriteRequest) {
                this.outstandingWriteRequests.incrementAndGet();
                this.validateWriteRequestVariant(request);
                LOG.debug("Starting write request zxid={}", (Object)request.zxid);
            } else {
                LOG.debug("Starting read request cxid={} for session 0x{}", (Object)request.cxid, (Object)Long.toHexString(request.sessionId));
                this.outstandingReadRequests.incrementAndGet();
                this.validateReadRequestVariant(request);
            }
            try {
                Thread.sleep(5 + this.rand.nextInt(25));
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.nextProcessor.processRequest(request);
            if (isWriteRequest) {
                this.outstandingWriteRequests.decrementAndGet();
                LOG.debug("Done write request zxid={}", (Object)request.zxid);
                CommitProcessorTest.this.processedWriteRequests.incrementAndGet();
            } else {
                this.outstandingReadRequests.decrementAndGet();
                LOG.debug("Done read request cxid={} for session 0x{}", (Object)request.cxid, (Object)Long.toHexString(request.sessionId));
                CommitProcessorTest.this.processedReadRequests.incrementAndGet();
            }
            this.validateRequest(request);
        }

        private void validateWriteRequestVariant(Request request) {
            int writeRequests;
            if (CommitProcessorTest.this.stopped) {
                return;
            }
            long zxid = request.getHdr().getZxid();
            int readRequests = this.outstandingReadRequests.get();
            if (readRequests != 0) {
                CommitProcessorTest.this.failTest("There are " + readRequests + " outstanding read requests while issuing a write request zxid=" + zxid);
            }
            if ((writeRequests = this.outstandingWriteRequests.get()) > 1) {
                CommitProcessorTest.this.failTest("There are " + writeRequests + " outstanding write requests while issuing a write request zxid=" + zxid + " (expected one)");
            }
        }

        private void validateReadRequestVariant(Request request) {
            int writeRequests = this.outstandingWriteRequests.get();
            if (writeRequests != 0) {
                CommitProcessorTest.this.failTest("There are " + writeRequests + " outstanding write requests while issuing a read request cxid=" + request.cxid + " for session 0x" + Long.toHexString(request.sessionId));
            }
        }

        private void validateRequest(Request request) {
            AtomicInteger sessionCxid;
            long zxid;
            LOG.debug("Got request {}", (Object)request);
            if (request.getHdr() != null && !this.expectedZxid.compareAndSet(zxid = request.getHdr().getZxid(), zxid + 1L)) {
                CommitProcessorTest.this.failTest("Write request, expected_zxid=" + this.expectedZxid.get() + "; req_zxid=" + zxid);
            }
            if ((sessionCxid = this.cxidMap.get(request.sessionId)) == null) {
                sessionCxid = new AtomicInteger(request.cxid + 1);
                AtomicInteger existingSessionCxid = this.cxidMap.putIfAbsent(request.sessionId, sessionCxid);
                if (existingSessionCxid != null) {
                    CommitProcessorTest.this.failTest("Race condition adding cxid=" + request.cxid + " for session 0x" + Long.toHexString(request.sessionId) + " with other_cxid=" + existingSessionCxid.get());
                }
            } else if (!sessionCxid.compareAndSet(request.cxid, request.cxid + 1)) {
                CommitProcessorTest.this.failTest("Expected_cxid=" + sessionCxid.get() + "; req_cxid=" + request.cxid);
            }
        }

        @Override
        public void shutdown() {
            LOG.info("shutdown validateReadRequestVariant");
            this.cxidMap.clear();
            this.expectedZxid = new AtomicLong(1L);
            if (this.nextProcessor != null) {
                this.nextProcessor.shutdown();
            }
        }
    }

    private class MockProposalRequestProcessor
    extends Thread
    implements RequestProcessor {
        private final CommitProcessor commitProcessor;
        private final LinkedBlockingQueue<Request> proposals = new LinkedBlockingQueue();

        public MockProposalRequestProcessor(CommitProcessor commitProcessor) {
            this.commitProcessor = commitProcessor;
        }

        @Override
        public void run() {
            Random rand = new Random(Thread.currentThread().getId());
            try {
                while (true) {
                    if (this.proposals.isEmpty()) {
                        continue;
                    }
                    Request request = this.proposals.take();
                    Thread.sleep(5 + rand.nextInt(95));
                    this.commitProcessor.commit(request);
                }
            }
            catch (InterruptedException interruptedException) {
                return;
            }
        }

        @Override
        public void processRequest(Request request) throws RequestProcessor.RequestProcessorException {
            this.commitProcessor.processRequest(request);
            if (request.getHdr() != null) {
                this.proposals.add(request);
            }
        }

        @Override
        public void shutdown() {
            LOG.info("shutdown MockProposalRequestProcessor");
            this.proposals.clear();
            if (this.commitProcessor != null) {
                this.commitProcessor.shutdown();
            }
        }
    }

    private class TestZooKeeperServer
    extends ZooKeeperServer {
        public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
            super(snapDir, logDir, tickTime);
        }

        public PrepRequestProcessor getFirstProcessor() {
            return (PrepRequestProcessor)this.firstProcessor;
        }

        @Override
        protected void setupRequestProcessors() {
            FinalRequestProcessor finalProcessor = new FinalRequestProcessor(CommitProcessorTest.this.zks);
            ValidateProcessor validateProcessor = new ValidateProcessor(finalProcessor);
            CommitProcessorTest.this.commitProcessor = new CommitProcessor(validateProcessor, "1", true, null);
            validateProcessor.setCommitProcessor(CommitProcessorTest.this.commitProcessor);
            CommitProcessorTest.this.commitProcessor.start();
            MockProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(CommitProcessorTest.this.commitProcessor);
            proposalProcessor.start();
            this.firstProcessor = new PrepRequestProcessor(CommitProcessorTest.this.zks, proposalProcessor);
            this.getFirstProcessor().start();
        }
    }

    private class TestClientThread
    extends Thread {
        long sessionId;
        int cxid;
        int nodeId;
        int writePercent;

        public TestClientThread(int writePercent) {
            this.sessionId = CommitProcessorTest.this.zks.getSessionTracker().createSession(5000);
            this.writePercent = writePercent;
        }

        public void sendWriteRequest() throws Exception {
            ByteArrayOutputStream boas = new ByteArrayOutputStream();
            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
            CreateRequest createReq = new CreateRequest("/session" + Long.toHexString(this.sessionId) + "-" + ++this.nodeId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 1);
            createReq.serialize(boa, "request");
            ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
            Request req = new Request(null, this.sessionId, ++this.cxid, 1, bb, new ArrayList<Id>());
            CommitProcessorTest.this.zks.getFirstProcessor().processRequest(req);
        }

        public void sendReadRequest() throws Exception {
            ByteArrayOutputStream boas = new ByteArrayOutputStream();
            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
            GetDataRequest getDataRequest = new GetDataRequest("/session" + Long.toHexString(this.sessionId) + "-" + this.nodeId, false);
            getDataRequest.serialize(boa, "request");
            ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
            Request req = new Request(null, this.sessionId, ++this.cxid, 4, bb, new ArrayList<Id>());
            CommitProcessorTest.this.zks.getFirstProcessor().processRequest(req);
        }

        @Override
        public void run() {
            Random rand = new Random(Thread.currentThread().getId());
            try {
                this.sendWriteRequest();
                while (!CommitProcessorTest.this.stopped) {
                    if (rand.nextInt(100) < this.writePercent) {
                        this.sendWriteRequest();
                    } else {
                        this.sendReadRequest();
                    }
                    Thread.sleep(5 + rand.nextInt(95));
                }
            }
            catch (Exception e) {
                LOG.error("Uncaught exception in test: ", (Throwable)e);
            }
        }
    }
}

