/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hadoop.shaded.org.apache.zookeeper.server;

import io.hops.hadoop.shaded.org.apache.jute.Record;
import io.hops.hadoop.shaded.org.apache.zookeeper.CreateMode;
import io.hops.hadoop.shaded.org.apache.zookeeper.PortAssignment;
import io.hops.hadoop.shaded.org.apache.zookeeper.WatchedEvent;
import io.hops.hadoop.shaded.org.apache.zookeeper.Watcher;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZKTestCase;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZooDefs;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZooKeeper;
import io.hops.hadoop.shaded.org.apache.zookeeper.proto.ReplyHeader;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.FinalRequestProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.NettyServerCnxn;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.NettyServerCnxnFactory;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.PrepRequestProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.Request;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.RequestProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ServerCnxnFactory;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.SyncRequestProcessor;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ZKDatabase;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ZooKeeperServer;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ZooKeeperServerMain;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.persistence.Util;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.util.ZxidUtils;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.ClientBase;
import io.hops.hadoop.shaded.org.apache.zookeeper.txn.SetDataTxn;
import io.hops.hadoop.shaded.org.apache.zookeeper.txn.TxnHeader;
import io.hops.hadoop.shaded.org.jboss.netty.channel.Channel;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperServerMainTest
extends ZKTestCase
implements Watcher {
    protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperServerMainTest.class);

    @Test(timeout=30000L)
    public void testNonRecoverableError() throws Exception {
        ClientBase.setupTestEnv();
        int CLIENT_PORT = PortAssignment.unique();
        MainThread main = new MainThread(CLIENT_PORT, true);
        main.start();
        Assert.assertTrue((String)"waiting for server being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT));
        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT, this);
        zk.create("/foo1", "foobar".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals((Object)new String(zk.getData("/foo1", null, null)), (Object)"foobar");
        ZooKeeperServer zooKeeperServer = main.getCnxnFactory().getZooKeeperServer();
        FileTxnSnapLog snapLog = zooKeeperServer.getTxnLogFactory();
        FileTxnSnapLog fileTxnSnapLogWithError = new FileTxnSnapLog(snapLog.getDataDir(), snapLog.getSnapDir()){

            @Override
            public void commit() throws IOException {
                throw new IOException("Input/output error");
            }
        };
        ZKDatabase newDB = new ZKDatabase(fileTxnSnapLogWithError);
        zooKeeperServer.setZKDatabase(newDB);
        try {
            zk.create("/foo2", "foobar".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.fail((String)"IOException is expected as error is injected in transaction log commit funtionality");
        }
        catch (Exception exception) {
            // empty catch block
        }
        zk.close();
        Assert.assertTrue((String)"waiting for server down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT));
        fileTxnSnapLogWithError.close();
        main.shutdown();
        main.deleteDirs();
    }

    @Test(timeout=30000L)
    public void testReadOnlySnapshotDir() throws Exception {
        ClientBase.setupTestEnv();
        int CLIENT_PORT = PortAssignment.unique();
        MainThread main = new MainThread(CLIENT_PORT, true);
        File tmpDir = main.tmpDir;
        main.start();
        Assert.assertTrue((String)"waiting for server being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT / 2));
        main.shutdown();
        File snapDir = new File(main.dataDir, "version-2");
        snapDir.setWritable(false);
        main = new MainThread(CLIENT_PORT, false, tmpDir);
        main.start();
        Assert.assertFalse((String)"waiting for server being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT / 2));
        main.shutdown();
        snapDir.setWritable(true);
        main.deleteDirs();
    }

    @Test(timeout=30000L)
    public void testReadOnlyTxnLogDir() throws Exception {
        ClientBase.setupTestEnv();
        int CLIENT_PORT = PortAssignment.unique();
        MainThread main = new MainThread(CLIENT_PORT, true);
        File tmpDir = main.tmpDir;
        main.start();
        Assert.assertTrue((String)"waiting for server being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT / 2));
        main.shutdown();
        File logDir = new File(main.logDir, "version-2");
        logDir.setWritable(false);
        main = new MainThread(CLIENT_PORT, false, tmpDir);
        main.start();
        Assert.assertFalse((String)"waiting for server being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT / 2));
        main.shutdown();
        logDir.setWritable(true);
        main.deleteDirs();
    }

    @Test
    public void testStandalone() throws Exception {
        ClientBase.setupTestEnv();
        int CLIENT_PORT = PortAssignment.unique();
        MainThread main = new MainThread(CLIENT_PORT, true);
        main.start();
        Assert.assertTrue((String)"waiting for server being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT));
        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT, this);
        zk.create("/foo", "foobar".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals((Object)new String(zk.getData("/foo", null, null)), (Object)"foobar");
        zk.close();
        main.shutdown();
        main.join();
        main.deleteDirs();
        Assert.assertTrue((String)"waiting for server down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT));
    }

    @Test(timeout=30000L)
    public void testAutoCreateDataLogDir() throws Exception {
        ClientBase.setupTestEnv();
        int CLIENT_PORT = PortAssignment.unique();
        MainThread main = new MainThread(CLIENT_PORT, false);
        String[] args = new String[]{main.confFile.toString()};
        main.start();
        Assert.assertTrue((String)"waiting for server being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT));
        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT, this);
        zk.create("/foo", "foobar".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals((Object)new String(zk.getData("/foo", null, null)), (Object)"foobar");
        zk.close();
        main.shutdown();
        main.join();
        main.deleteDirs();
        Assert.assertTrue((String)"waiting for server down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT));
    }

    @Test
    public void testJMXRegistrationWithNIO() throws Exception {
        ClientBase.setupTestEnv();
        File tmpDir_1 = ClientBase.createTmpDir();
        ServerCnxnFactory server_1 = this.startServer(tmpDir_1);
        File tmpDir_2 = ClientBase.createTmpDir();
        ServerCnxnFactory server_2 = this.startServer(tmpDir_2);
        server_1.shutdown();
        server_2.shutdown();
        this.deleteFile(tmpDir_1);
        this.deleteFile(tmpDir_2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJMXRegistrationWithNetty() throws Exception {
        String originalServerCnxnFactory = System.getProperty("zookeeper.serverCnxnFactory");
        System.setProperty("zookeeper.serverCnxnFactory", NettyServerCnxnFactory.class.getName());
        try {
            ClientBase.setupTestEnv();
            File tmpDir_1 = ClientBase.createTmpDir();
            ServerCnxnFactory server_1 = this.startServer(tmpDir_1);
            File tmpDir_2 = ClientBase.createTmpDir();
            ServerCnxnFactory server_2 = this.startServer(tmpDir_2);
            server_1.shutdown();
            server_2.shutdown();
            this.deleteFile(tmpDir_1);
            this.deleteFile(tmpDir_2);
        }
        finally {
            if (originalServerCnxnFactory == null || originalServerCnxnFactory.isEmpty()) {
                System.clearProperty("zookeeper.serverCnxnFactory");
            } else {
                System.setProperty("zookeeper.serverCnxnFactory", originalServerCnxnFactory);
            }
        }
    }

    @Test(timeout=30000L)
    public void testRaceBetweenSyncFlushAndZKShutdown() throws Exception {
        File tmpDir = ClientBase.createTmpDir();
        File testDir = File.createTempFile("test", ".dir", tmpDir);
        testDir.delete();
        FileTxnSnapLog ftsl = new FileTxnSnapLog(testDir, testDir);
        final SimpleZooKeeperServer zkServer = new SimpleZooKeeperServer(ftsl);
        zkServer.startup();
        zkServer.waitForFinalProcessRequest();
        Thread shutdownThread = new Thread(){

            @Override
            public void run() {
                zkServer.shutdown();
            }
        };
        shutdownThread.start();
        zkServer.waitForSyncReqProcessorShutdown();
        zkServer.resumeFinalProcessRequest();
        shutdownThread.join();
    }

    private void deleteFile(File f) throws IOException {
        if (f.isDirectory()) {
            for (File c : f.listFiles()) {
                this.deleteFile(c);
            }
        }
        if (!f.delete() && f.exists()) {
            throw new IOException("Failed to delete file: " + f);
        }
    }

    private ServerCnxnFactory startServer(File tmpDir) throws IOException, InterruptedException {
        int CLIENT_PORT = PortAssignment.unique();
        ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
        ServerCnxnFactory f = ServerCnxnFactory.createFactory(CLIENT_PORT, -1);
        f.startup(zks);
        Assert.assertNotNull((String)"JMX initialization failed!", (Object)zks.jmxServerBean);
        Assert.assertTrue((String)"waiting for server being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT));
        return f;
    }

    @Override
    public void process(WatchedEvent event) {
    }

    private class SimpleSyncRequestProcessor
    extends SyncRequestProcessor {
        private final CountDownLatch shutdownCalled;

        public SimpleSyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
            super(zks, nextProcessor);
            this.shutdownCalled = new CountDownLatch(1);
        }

        @Override
        public void shutdown() {
            this.shutdownCalled.countDown();
            super.shutdown();
        }

        boolean waitForShutdownToBeCalled() throws InterruptedException {
            return this.shutdownCalled.await(ClientBase.CONNECTION_TIMEOUT / 3, TimeUnit.MILLISECONDS);
        }
    }

    private class SimpleFinalRequestProcessor
    extends FinalRequestProcessor {
        private CountDownLatch finalReqProcessCalled;
        private CountDownLatch resumeFinalReqProcess;
        private volatile boolean interrupted;

        public SimpleFinalRequestProcessor(ZooKeeperServer zks) {
            super(zks);
            this.finalReqProcessCalled = new CountDownLatch(1);
            this.resumeFinalReqProcess = new CountDownLatch(1);
            this.interrupted = false;
        }

        @Override
        public void processRequest(Request request) {
            this.finalReqProcessCalled.countDown();
            try {
                this.resumeFinalReqProcess.await(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                LOG.error("Interrupted while waiting to process request", (Throwable)e);
                this.interrupted = true;
                this.resumeFinalReqProcess.countDown();
                return;
            }
            super.processRequest(request);
        }

        boolean waitForProcessRequestToBeCalled() throws InterruptedException {
            return this.finalReqProcessCalled.await(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
        }

        void resumeProcessRequest() throws InterruptedException {
            this.resumeFinalReqProcess.countDown();
            this.resumeFinalReqProcess.await(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
            Assert.assertFalse((String)"Interrupted while waiting to process request", (boolean)this.interrupted);
        }
    }

    private class MockNettyServerCnxn
    extends NettyServerCnxn {
        public MockNettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
            super(null, null, factory);
        }

        @Override
        protected synchronized void updateStatsForResponse(long cxid, long zxid, String op, long start, long end) {
        }

        @Override
        public synchronized void sendResponse(ReplyHeader h, Record r, String tag) {
        }
    }

    private class SimpleZooKeeperServer
    extends ZooKeeperServer {
        private SimpleSyncRequestProcessor syncProcessor;
        private SimpleFinalRequestProcessor finalProcessor;

        SimpleZooKeeperServer(FileTxnSnapLog ftsl) throws IOException {
            super(ftsl, 2000, 2000, 4000, null, new ZKDatabase(ftsl));
        }

        @Override
        protected void setupRequestProcessors() {
            this.finalProcessor = new SimpleFinalRequestProcessor(this);
            this.syncProcessor = new SimpleSyncRequestProcessor(this, this.finalProcessor);
            this.syncProcessor.start();
            this.firstProcessor = new PrepRequestProcessor(this, this.syncProcessor);
            ((PrepRequestProcessor)this.firstProcessor).start();
            this.addRequestToSyncProcessor();
        }

        private void addRequestToSyncProcessor() {
            byte[] buf;
            long zxid = ZxidUtils.makeZxid(3L, 7L);
            TxnHeader hdr = new TxnHeader(1L, 1, zxid, 1L, 5);
            SetDataTxn txn = new SetDataTxn("/foo" + zxid, new byte[0], 1);
            try {
                buf = Util.marshallTxnEntry(hdr, txn);
            }
            catch (IOException e) {
                LOG.error("IOException while adding request to SyncRequestProcessor", (Throwable)e);
                Assert.fail((String)"IOException while adding request to SyncRequestProcessor!");
                return;
            }
            NettyServerCnxnFactory factory = new NettyServerCnxnFactory();
            MockNettyServerCnxn nettyCnxn = new MockNettyServerCnxn(null, this, factory);
            Request req = new Request(nettyCnxn, 1L, 1, 5, ByteBuffer.wrap(buf), null);
            req.hdr = hdr;
            req.txn = txn;
            this.syncProcessor.processRequest(req);
        }

        void waitForFinalProcessRequest() throws InterruptedException {
            Assert.assertTrue((String)"Waiting for FinalRequestProcessor to start processing request", (boolean)this.finalProcessor.waitForProcessRequestToBeCalled());
        }

        void waitForSyncReqProcessorShutdown() throws InterruptedException {
            Assert.assertTrue((String)"Waiting for SyncRequestProcessor to shut down", (boolean)this.syncProcessor.waitForShutdownToBeCalled());
        }

        void resumeFinalProcessRequest() throws InterruptedException {
            this.finalProcessor.resumeProcessRequest();
        }
    }

    public static class TestZKSMain
    extends ZooKeeperServerMain {
        @Override
        public void shutdown() {
            super.shutdown();
        }
    }

    public static class MainThread
    extends Thread {
        final File confFile;
        final TestZKSMain main;
        final File tmpDir;
        final File dataDir;
        final File logDir;

        public MainThread(int clientPort, boolean preCreateDirs) throws IOException {
            this(clientPort, preCreateDirs, ClientBase.createTmpDir());
        }

        public MainThread(int clientPort, boolean preCreateDirs, File tmpDir) throws IOException {
            super("Standalone server with clientPort:" + clientPort);
            this.tmpDir = tmpDir;
            this.confFile = new File(this.tmpDir, "zoo.cfg");
            FileWriter fwriter = new FileWriter(this.confFile);
            fwriter.write("tickTime=2000\n");
            fwriter.write("initLimit=10\n");
            fwriter.write("syncLimit=5\n");
            this.dataDir = new File(this.tmpDir, "data");
            this.logDir = new File(this.dataDir.toString() + "_txnlog");
            if (preCreateDirs) {
                if (!this.dataDir.mkdir()) {
                    throw new IOException("unable to mkdir " + this.dataDir);
                }
                if (!this.logDir.mkdir()) {
                    throw new IOException("unable to mkdir " + this.logDir);
                }
            }
            String dataDirPath = this.dataDir.toString();
            String logDirPath = this.logDir.toString();
            String osname = System.getProperty("os.name");
            if (osname.toLowerCase().contains("windows")) {
                dataDirPath = dataDirPath.replace('\\', '/');
                logDirPath = logDirPath.replace('\\', '/');
            }
            fwriter.write("dataDir=" + dataDirPath + "\n");
            fwriter.write("dataLogDir=" + logDirPath + "\n");
            fwriter.write("clientPort=" + clientPort + "\n");
            fwriter.flush();
            fwriter.close();
            this.main = new TestZKSMain();
        }

        @Override
        public void run() {
            String[] args = new String[]{this.confFile.toString()};
            try {
                this.main.initializeAndRun(args);
            }
            catch (Exception e) {
                LOG.error("unexpected exception in run", (Throwable)e);
            }
        }

        public void shutdown() throws IOException {
            this.main.shutdown();
        }

        void deleteDirs() throws IOException {
            this.delete(this.tmpDir);
        }

        void delete(File f) throws IOException {
            if (f.isDirectory()) {
                for (File c : f.listFiles()) {
                    this.delete(c);
                }
            }
            if (!f.delete() && f.exists()) {
                throw new IOException("Failed to delete file: " + f);
            }
        }

        ServerCnxnFactory getCnxnFactory() {
            return this.main.getCnxnFactory();
        }
    }
}

