/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum;

import io.hops.hadoop.shaded.org.apache.commons.io.FileUtils;
import io.hops.hadoop.shaded.org.apache.log4j.Appender;
import io.hops.hadoop.shaded.org.apache.log4j.Layout;
import io.hops.hadoop.shaded.org.apache.log4j.Level;
import io.hops.hadoop.shaded.org.apache.log4j.Logger;
import io.hops.hadoop.shaded.org.apache.log4j.PatternLayout;
import io.hops.hadoop.shaded.org.apache.log4j.Priority;
import io.hops.hadoop.shaded.org.apache.log4j.WriterAppender;
import io.hops.hadoop.shaded.org.apache.zookeeper.AsyncCallback;
import io.hops.hadoop.shaded.org.apache.zookeeper.CreateMode;
import io.hops.hadoop.shaded.org.apache.zookeeper.KeeperException;
import io.hops.hadoop.shaded.org.apache.zookeeper.PortAssignment;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZooDefs;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZooKeeper;
import io.hops.hadoop.shaded.org.apache.zookeeper.common.Time;
import io.hops.hadoop.shaded.org.apache.zookeeper.common.X509Exception;
import io.hops.hadoop.shaded.org.apache.zookeeper.data.Stat;
import io.hops.hadoop.shaded.org.apache.zookeeper.metrics.BaseTestMetricsProvider;
import io.hops.hadoop.shaded.org.apache.zookeeper.metrics.impl.NullMetricsProvider;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.Follower;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.Leader;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.LearnerHandler;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.LearnerSyncThrottler;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumPacket;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumPeer;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMain;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.SyncThrottleException;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.SyncedLearnerTracker;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.ClientBase;
import io.hops.hadoop.shaded.org.mockito.ArgumentMatchers;
import io.hops.hadoop.shaded.org.mockito.Mockito;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.OutputStream;
import java.io.StringReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import javax.security.sasl.SaslException;
import org.junit.Assert;
import org.junit.Test;

public class QuorumPeerMainTest
extends QuorumPeerTestBase {
    public void testQuorumInternal(String addr) throws Exception {
        ClientBase.setupTestEnv();
        int CLIENT_PORT_QP1 = PortAssignment.unique();
        int CLIENT_PORT_QP2 = PortAssignment.unique();
        String server1 = String.format("server.1=%1$s:%2$s:%3$s;%4$s", addr, PortAssignment.unique(), PortAssignment.unique(), CLIENT_PORT_QP1);
        String server2 = String.format("server.2=%1$s:%2$s:%3$s;%4$s", addr, PortAssignment.unique(), PortAssignment.unique(), CLIENT_PORT_QP2);
        String quorumCfgSection = server1 + "\n" + server2;
        QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
        QuorumPeerTestBase.MainThread q2 = new QuorumPeerTestBase.MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
        q1.start();
        q2.start();
        Assert.assertTrue((String)"waiting for server 1 being up", (boolean)ClientBase.waitForServerUp(addr + ":" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue((String)"waiting for server 2 being up", (boolean)ClientBase.waitForServerUp(addr + ":" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        QuorumPeer quorumPeer = q1.main.quorumPeer;
        int tickTime = quorumPeer.getTickTime();
        Assert.assertEquals((String)"Default value of minimumSessionTimeOut is not considered", (long)(tickTime * 2), (long)quorumPeer.getMinSessionTimeout());
        Assert.assertEquals((String)"Default value of maximumSessionTimeOut is not considered", (long)(tickTime * 20), (long)quorumPeer.getMaxSessionTimeout());
        ZooKeeper zk = new ZooKeeper(addr + ":" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this);
        QuorumPeerMainTest.waitForOne(zk, ZooKeeper.States.CONNECTED);
        zk.create("/foo_q1", "foobar1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals((Object)new String(zk.getData("/foo_q1", null, null)), (Object)"foobar1");
        zk.close();
        zk = new ZooKeeper(addr + ":" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT, this);
        QuorumPeerMainTest.waitForOne(zk, ZooKeeper.States.CONNECTED);
        zk.create("/foo_q2", "foobar2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals((Object)new String(zk.getData("/foo_q2", null, null)), (Object)"foobar2");
        zk.close();
        q1.shutdown();
        q2.shutdown();
        Assert.assertTrue((String)"waiting for server 1 down", (boolean)ClientBase.waitForServerDown(addr + ":" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue((String)"waiting for server 2 down", (boolean)ClientBase.waitForServerDown(addr + ":" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
    }

    @Test
    public void testQuorum() throws Exception {
        this.testQuorumInternal("127.0.0.1");
    }

    @Test
    public void testQuorumV6() throws Exception {
        this.testQuorumInternal("[::1]");
    }

    @Test
    public void testEarlyLeaderAbandonment() throws Exception {
        int i;
        int i2;
        ClientBase.setupTestEnv();
        int SERVER_COUNT = 3;
        int[] clientPorts = new int[3];
        StringBuilder sb = new StringBuilder();
        for (int i3 = 0; i3 < 3; ++i3) {
            clientPorts[i3] = PortAssignment.unique();
            sb.append("server." + i3 + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + clientPorts[i3] + "\n");
        }
        String quorumCfgSection = sb.toString();
        QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[3];
        ZooKeeper[] zk = new ZooKeeper[3];
        for (i2 = 0; i2 < 3; ++i2) {
            mt[i2] = new QuorumPeerTestBase.MainThread(i2, clientPorts[i2], quorumCfgSection);
            mt[i2].start();
            zk[i2] = new ZooKeeper("127.0.0.1:" + clientPorts[i2], ClientBase.CONNECTION_TIMEOUT, this);
        }
        QuorumPeerMainTest.waitForAll(zk, ZooKeeper.States.CONNECTED);
        for (i2 = 0; i2 < 3; ++i2) {
            mt[i2].shutdown();
        }
        QuorumPeerMainTest.waitForAll(zk, ZooKeeper.States.CONNECTING);
        for (i2 = 0; i2 < 3; ++i2) {
            mt[i2].start();
            zk[i2] = new ZooKeeper("127.0.0.1:" + clientPorts[i2], ClientBase.CONNECTION_TIMEOUT, this);
        }
        QuorumPeerMainTest.waitForAll(zk, ZooKeeper.States.CONNECTED);
        int leader = -1;
        ConcurrentMap<Long, Leader.Proposal> outstanding = null;
        for (int i4 = 0; i4 < 3; ++i4) {
            if (mt[i4].main.quorumPeer.leader == null) {
                mt[i4].shutdown();
                continue;
            }
            leader = i4;
            outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals;
        }
        try {
            zk[leader].create("/zk" + leader, "zk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.fail((String)("create /zk" + leader + " should have failed"));
        }
        catch (KeeperException i4) {
            // empty catch block
        }
        Assert.assertTrue((outstanding.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((((Leader.Proposal)outstanding.values().iterator().next()).request.getHdr().getType() == 1 ? 1 : 0) != 0);
        Thread.sleep(1000L);
        mt[leader].shutdown();
        QuorumPeerMainTest.waitForAll(zk, ZooKeeper.States.CONNECTING);
        for (i = 0; i < 3; ++i) {
            if (i == leader) continue;
            mt[i].start();
        }
        for (i = 0; i < 3; ++i) {
            if (i == leader) continue;
            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
            QuorumPeerMainTest.waitForOne(zk[i], ZooKeeper.States.CONNECTED);
            zk[i].create("/zk" + i, "zk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        mt[leader].start();
        QuorumPeerMainTest.waitForAll(zk, ZooKeeper.States.CONNECTED);
        for (i = 0; i < 3; ++i) {
            for (int j = 0; j < 3; ++j) {
                if (i == leader) {
                    Assert.assertTrue((String)((j == leader ? "Leader (" + leader + ")" : "Follower " + j) + " should not have /zk" + i), (zk[j].exists("/zk" + i, false) == null ? 1 : 0) != 0);
                    continue;
                }
                Assert.assertTrue((String)((j == leader ? "Leader (" + leader + ")" : "Follower " + j) + " does not have /zk" + i), (zk[j].exists("/zk" + i, false) != null ? 1 : 0) != 0);
            }
        }
        for (i = 0; i < 3; ++i) {
            zk[i].close();
        }
        for (i = 0; i < 3; ++i) {
            mt[i].shutdown();
        }
    }

    @Test
    public void testHighestZxidJoinLate() throws Exception {
        int i;
        this.numServers = 3;
        this.servers = this.LaunchServers(this.numServers);
        String path = "/hzxidtest";
        int leader = this.servers.findLeader();
        Assert.assertTrue((String)"There should be a leader", (leader >= 0 ? 1 : 0) != 0);
        int nonleader = (leader + 1) % this.numServers;
        byte[] input = new byte[]{1};
        this.servers.zk[leader].create(path + leader, input, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        this.servers.zk[leader].create(path + nonleader, input, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        byte[] output = this.servers.zk[leader].getData(path + nonleader, false, null);
        for (i = 0; i < this.numServers; ++i) {
            if (i == leader) continue;
            this.servers.mt[i].shutdown();
        }
        input[0] = 2;
        this.servers.zk[leader].setData(path + leader, input, -1, null, null);
        Thread.sleep(500L);
        this.servers.mt[leader].shutdown();
        System.gc();
        QuorumPeerMainTest.waitForAll(this.servers.zk, ZooKeeper.States.CONNECTING);
        for (i = 0; i < this.numServers; ++i) {
            if (i == leader) continue;
            this.servers.mt[i].start();
        }
        QuorumPeerMainTest.waitForOne(this.servers.zk[nonleader], ZooKeeper.States.CONNECTED);
        output = this.servers.zk[nonleader].getData(path + leader, false, null);
        Assert.assertEquals((String)"Expecting old value 1 since 2 isn't committed yet", (long)output[0], (long)1L);
        this.servers.zk[nonleader].setData(path + nonleader, input, -1);
        this.servers.mt[leader].start();
        QuorumPeerMainTest.waitForOne(this.servers.zk[leader], ZooKeeper.States.CONNECTED);
        output = this.servers.zk[leader].getData(path + leader, false, null);
        Assert.assertEquals((String)"Validating that the deposed leader has rolled back that change it had written", (long)output[0], (long)1L);
        output = this.servers.zk[leader].getData(path + nonleader, false, null);
        Assert.assertEquals((String)"Validating that the deposed leader caught up on changes it missed", (long)output[0], (long)2L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testElectionFraud() throws IOException, InterruptedException {
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        WriterAppender appender = this.getConsoleAppender(os, Level.INFO);
        Logger qlogger = Logger.getLogger(QuorumPeer.class);
        qlogger.addAppender((Appender)appender);
        this.numServers = 3;
        boolean foundLeading = false;
        boolean foundLooking = false;
        boolean foundFollowing = false;
        try {
            String line;
            this.servers = this.LaunchServers(this.numServers, 500);
            int trueLeader = this.servers.findLeader();
            Assert.assertTrue((String)"There should be a leader", (trueLeader >= 0 ? 1 : 0) != 0);
            int falseLeader = (trueLeader + 1) % this.numServers;
            Assert.assertTrue((String)"All servers should join the quorum", (this.servers.mt[falseLeader].main.quorumPeer.follower != null ? 1 : 0) != 0);
            this.servers.mt[falseLeader].main.quorumPeer.electionAlg.shutdown();
            this.servers.mt[falseLeader].main.quorumPeer.follower.getSocket().close();
            QuorumPeerMainTest.waitForOne(this.servers.zk[falseLeader], ZooKeeper.States.CONNECTING);
            this.servers.mt[falseLeader].main.quorumPeer.setPeerState(QuorumPeer.ServerState.LEADING);
            Thread.sleep(2 * this.servers.mt[falseLeader].main.quorumPeer.initLimit * this.servers.mt[falseLeader].main.quorumPeer.tickTime);
            this.servers.mt[falseLeader].main.quorumPeer.startLeaderElection();
            this.servers.zk[falseLeader] = new ZooKeeper("127.0.0.1:" + this.servers.mt[falseLeader].getClientPort(), ClientBase.CONNECTION_TIMEOUT, this);
            QuorumPeerMainTest.waitForOne(this.servers.zk[falseLeader], ZooKeeper.States.CONNECTED);
            Assert.assertTrue((this.servers.mt[trueLeader].main.quorumPeer.leader != null ? 1 : 0) != 0);
            LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
            Pattern leading = Pattern.compile(".*myid=" + falseLeader + ".*LEADING.*");
            Pattern looking = Pattern.compile(".*myid=" + falseLeader + ".*LOOKING.*");
            Pattern following = Pattern.compile(".*myid=" + falseLeader + ".*FOLLOWING.*");
            while ((line = r.readLine()) != null) {
                if (!foundLeading) {
                    foundLeading = leading.matcher(line).matches();
                    continue;
                }
                if (!foundLooking) {
                    foundLooking = looking.matcher(line).matches();
                    continue;
                }
                if (!following.matcher(line).matches()) continue;
                foundFollowing = true;
                break;
            }
        }
        finally {
            qlogger.removeAppender((Appender)appender);
        }
        Assert.assertTrue((String)"falseLeader never attempts to become leader", (boolean)foundLeading);
        Assert.assertTrue((String)"falseLeader never gives up on leadership", (boolean)foundLooking);
        Assert.assertTrue((String)"falseLeader never rejoins the quorum", (boolean)foundFollowing);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBadPeerAddressInQuorum() throws Exception {
        String line;
        ClientBase.setupTestEnv();
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        WriterAppender appender = this.getConsoleAppender(os, Level.WARN);
        Logger qlogger = Logger.getLogger((String)"io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum");
        qlogger.addAppender((Appender)appender);
        try {
            int CLIENT_PORT_QP1 = PortAssignment.unique();
            int CLIENT_PORT_QP2 = PortAssignment.unique();
            String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nserver.2=fee.fii.foo.fum:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2;
            QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
            q1.start();
            boolean isup = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 30000L);
            Assert.assertFalse((String)"Server never came up", (boolean)isup);
            q1.shutdown();
            Assert.assertTrue((String)"waiting for server 1 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
        }
        finally {
            qlogger.removeAppender((Appender)appender);
        }
        LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
        boolean found = false;
        Pattern p = Pattern.compile(".*Cannot open channel to .* at election address .*");
        while ((line = r.readLine()) != null && !(found = p.matcher(line).matches())) {
        }
        Assert.assertTrue((String)"complains about host", (boolean)found);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInconsistentPeerType() throws Exception {
        String line;
        ClientBase.setupTestEnv();
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        WriterAppender appender = this.getConsoleAppender(os, Level.INFO);
        Logger qlogger = Logger.getLogger((String)"io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum");
        qlogger.addAppender((Appender)appender);
        try {
            int CLIENT_PORT_QP1 = PortAssignment.unique();
            int CLIENT_PORT_QP2 = PortAssignment.unique();
            int CLIENT_PORT_QP3 = PortAssignment.unique();
            String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2 + "\nserver.3=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":observer;" + CLIENT_PORT_QP3;
            QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
            QuorumPeerTestBase.MainThread q2 = new QuorumPeerTestBase.MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
            QuorumPeerTestBase.MainThread q3 = new QuorumPeerTestBase.MainThread(3, CLIENT_PORT_QP3, quorumCfgSection);
            q1.start();
            q2.start();
            q3.start();
            Assert.assertTrue((String)"waiting for server 1 being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue((String)"waiting for server 2 being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue((String)"waiting for server 3 being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP3, ClientBase.CONNECTION_TIMEOUT));
            q1.shutdown();
            q2.shutdown();
            q3.shutdown();
            Assert.assertTrue((String)"waiting for server 1 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue((String)"waiting for server 2 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue((String)"waiting for server 3 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP3, ClientBase.CONNECTION_TIMEOUT));
        }
        finally {
            qlogger.removeAppender((Appender)appender);
        }
        LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
        boolean warningPresent = false;
        boolean defaultedToObserver = false;
        Pattern pWarn = Pattern.compile(".*Peer type from servers list.* doesn't match peerType.*");
        Pattern pObserve = Pattern.compile(".*OBSERVING.*");
        while ((line = r.readLine()) != null) {
            if (pWarn.matcher(line).matches()) {
                warningPresent = true;
            }
            if (pObserve.matcher(line).matches()) {
                defaultedToObserver = true;
            }
            if (!warningPresent || !defaultedToObserver) continue;
        }
        Assert.assertTrue((String)"Should warn about inconsistent peer type", (warningPresent && defaultedToObserver ? 1 : 0) != 0);
    }

    @Test
    public void testBadPackets() throws Exception {
        ClientBase.setupTestEnv();
        int CLIENT_PORT_QP1 = PortAssignment.unique();
        int CLIENT_PORT_QP2 = PortAssignment.unique();
        int electionPort1 = PortAssignment.unique();
        int electionPort2 = PortAssignment.unique();
        String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + electionPort1 + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + electionPort2 + ";" + CLIENT_PORT_QP2;
        QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
        QuorumPeerTestBase.MainThread q2 = new QuorumPeerTestBase.MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
        q1.start();
        q2.start();
        Assert.assertTrue((String)"waiting for server 1 being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue((String)"waiting for server 2 being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        byte[] b = new byte[4];
        int length = 0x40000000;
        ByteBuffer buff = ByteBuffer.wrap(b);
        buff.putInt(length);
        buff.position(0);
        SocketChannel s = SocketChannel.open(new InetSocketAddress("127.0.0.1", electionPort1));
        s.write(buff);
        s.close();
        buff.position(0);
        s = SocketChannel.open(new InetSocketAddress("127.0.0.1", electionPort2));
        s.write(buff);
        s.close();
        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this);
        QuorumPeerMainTest.waitForOne(zk, ZooKeeper.States.CONNECTED);
        zk.create("/foo_q1", "foobar1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals((Object)new String(zk.getData("/foo_q1", null, null)), (Object)"foobar1");
        zk.close();
        q1.shutdown();
        q2.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testQuorumDefaults() throws Exception {
        String line;
        ClientBase.setupTestEnv();
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        WriterAppender appender = this.getConsoleAppender(os, Level.INFO);
        appender.setImmediateFlush(true);
        Logger zlogger = Logger.getLogger((String)"io.hops.hadoop.shaded.org.apache.zookeeper");
        zlogger.addAppender((Appender)appender);
        try {
            int CLIENT_PORT_QP1 = PortAssignment.unique();
            int CLIENT_PORT_QP2 = PortAssignment.unique();
            String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2;
            QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
            QuorumPeerTestBase.MainThread q2 = new QuorumPeerTestBase.MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
            q1.start();
            q2.start();
            Assert.assertTrue((String)"waiting for server 1 being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue((String)"waiting for server 2 being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
            q1.shutdown();
            q2.shutdown();
            Assert.assertTrue((String)"waiting for server 1 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue((String)"waiting for server 2 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        }
        finally {
            zlogger.removeAppender((Appender)appender);
        }
        os.close();
        LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
        boolean found = false;
        Pattern p = Pattern.compile(".*FastLeaderElection.*");
        while ((line = r.readLine()) != null && !(found = p.matcher(line).matches())) {
        }
        Assert.assertTrue((String)"fastleaderelection used", (boolean)found);
    }

    @Test
    public void testQuorumPeerExitTime() throws Exception {
        long maxwait = 3000L;
        int CLIENT_PORT_QP1 = PortAssignment.unique();
        String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + PortAssignment.unique();
        QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
        q1.start();
        Thread.sleep(30000L);
        long start = Time.currentElapsedTime();
        q1.shutdown();
        long end = Time.currentElapsedTime();
        if (end - start > maxwait) {
            Assert.fail((String)("QuorumPeer took " + (end - start) + " to shutdown, expected " + maxwait));
        }
    }

    @Test
    public void testMinMaxSessionTimeOut() throws Exception {
        ClientBase.setupTestEnv();
        int CLIENT_PORT_QP1 = PortAssignment.unique();
        int CLIENT_PORT_QP2 = PortAssignment.unique();
        String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique();
        int minSessionTimeOut = 10000;
        int maxSessionTimeOut = 15000;
        String configs = "maxSessionTimeout=15000\nminSessionTimeout=10000\n";
        QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, "maxSessionTimeout=15000\nminSessionTimeout=10000\n");
        QuorumPeerTestBase.MainThread q2 = new QuorumPeerTestBase.MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, "maxSessionTimeout=15000\nminSessionTimeout=10000\n");
        q1.start();
        q2.start();
        Assert.assertTrue((String)"waiting for server 1 being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue((String)"waiting for server 2 being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        QuorumPeer quorumPeer = q1.main.quorumPeer;
        Assert.assertEquals((String)"minimumSessionTimeOut is not considered", (long)10000L, (long)quorumPeer.getMinSessionTimeout());
        Assert.assertEquals((String)"maximumSessionTimeOut is not considered", (long)15000L, (long)quorumPeer.getMaxSessionTimeout());
    }

    @Test
    public void testWithOnlyMinSessionTimeout() throws Exception {
        ClientBase.setupTestEnv();
        int CLIENT_PORT_QP1 = PortAssignment.unique();
        int CLIENT_PORT_QP2 = PortAssignment.unique();
        String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique();
        int minSessionTimeOut = 15000;
        String configs = "minSessionTimeout=15000\n";
        QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, "minSessionTimeout=15000\n");
        QuorumPeerTestBase.MainThread q2 = new QuorumPeerTestBase.MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, "minSessionTimeout=15000\n");
        q1.start();
        q2.start();
        Assert.assertTrue((String)"waiting for server 1 being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue((String)"waiting for server 2 being up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        QuorumPeer quorumPeer = q1.main.quorumPeer;
        int maxSessionTimeOut = quorumPeer.tickTime * 20;
        Assert.assertEquals((String)"minimumSessionTimeOut is not considered", (long)15000L, (long)quorumPeer.getMinSessionTimeout());
        Assert.assertEquals((String)"maximumSessionTimeOut is wrong", (long)maxSessionTimeOut, (long)quorumPeer.getMaxSessionTimeout());
    }

    @Test
    public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
        int i;
        int LEADER_TIMEOUT_MS = 10000;
        ClientBase.setupTestEnv();
        int SERVER_COUNT = 3;
        this.servers = this.LaunchServers(3);
        this.waitForAll(this.servers, ZooKeeper.States.CONNECTED);
        this.servers.shutDownAllServers();
        this.waitForAll(this.servers, ZooKeeper.States.CONNECTING);
        this.servers.restartAllServersAndClients(this);
        this.waitForAll(this.servers, ZooKeeper.States.CONNECTED);
        int leader = this.servers.findLeader();
        ConcurrentMap<Long, Leader.Proposal> outstanding = this.servers.mt[leader].main.quorumPeer.leader.outstandingProposals;
        int previousTick = this.servers.mt[leader].main.quorumPeer.tickTime;
        this.servers.mt[leader].main.quorumPeer.tickTime = 10000;
        Thread.sleep(previousTick);
        LOG.warn("LEADER {}", (Object)leader);
        for (i = 0; i < 3; ++i) {
            if (i == leader) continue;
            this.servers.mt[i].shutdown();
        }
        for (i = 0; i < 3; ++i) {
            if (i == leader) continue;
            this.servers.mt[i].start();
        }
        for (i = 0; i < 3; ++i) {
            if (i == leader) continue;
            this.servers.restartClient(i, this);
            QuorumPeerMainTest.waitForOne(this.servers.zk[i], ZooKeeper.States.CONNECTED);
        }
        try {
            this.servers.zk[leader].create("/zk" + leader, "zk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.fail((String)("create /zk" + leader + " should have failed"));
        }
        catch (KeeperException i2) {
            // empty catch block
        }
        Assert.assertTrue((outstanding.size() > 0 ? 1 : 0) != 0);
        Leader.Proposal p = this.findProposalOfType(outstanding, 1);
        LOG.info("Old leader id: {}. All proposals: {}", (Object)leader, outstanding);
        Assert.assertNotNull((String)"Old leader doesn't have 'create' proposal", (Object)p);
        int sleepTime = 0;
        Long longLeader = leader;
        while (!((SyncedLearnerTracker.QuorumVerifierAcksetPair)p.qvAcksetPairs.get(0)).getAckset().contains(longLeader)) {
            if (sleepTime > 2000) {
                Assert.fail((String)("Transaction not synced to disk within 1 second " + ((SyncedLearnerTracker.QuorumVerifierAcksetPair)p.qvAcksetPairs.get(0)).getAckset() + " expected " + leader));
            }
            Thread.sleep(100L);
            sleepTime += 100;
        }
        LOG.info("Waiting for leader {} to timeout followers", (Object)leader);
        sleepTime = 0;
        Follower f = this.servers.mt[leader].main.quorumPeer.follower;
        while (f == null || !f.isRunning()) {
            if (sleepTime > 20000) {
                Assert.fail((String)("Took too long for old leader to time out " + (Object)((Object)this.servers.mt[leader].main.quorumPeer.getPeerState())));
            }
            Thread.sleep(100L);
            sleepTime += 100;
            f = this.servers.mt[leader].main.quorumPeer.follower;
        }
        int newLeader = this.servers.findLeader();
        Assert.assertNotEquals((long)leader, (long)newLeader);
        this.servers.mt[leader].shutdown();
        this.servers.mt[leader].start();
        this.servers.restartClient(leader, this);
        this.waitForAll(this.servers, ZooKeeper.States.CONNECTED);
        for (int i3 = 0; i3 < 3; ++i3) {
            Assert.assertNull((String)("server " + i3 + " should not have /zk" + leader), (Object)this.servers.zk[i3].exists("/zk" + leader, false));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLeaderOutOfView() throws Exception {
        ClientBase.setupTestEnv();
        int numServers = 3;
        boolean foundLeading = false;
        boolean foundFollowing = false;
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        WriterAppender appender = this.getConsoleAppender(os, Level.DEBUG);
        Logger qlogger = Logger.getLogger((String)"io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum");
        qlogger.addAppender((Appender)appender);
        try {
            String line;
            int i;
            QuorumPeerTestBase.Servers svrs = new QuorumPeerTestBase.Servers();
            svrs.clientPorts = new int[numServers];
            for (int i2 = 0; i2 < numServers; ++i2) {
                svrs.clientPorts[i2] = PortAssignment.unique();
            }
            String quorumCfgIncomplete = this.getUniquePortCfgForId(1) + "\n" + this.getUniquePortCfgForId(2);
            String quorumCfgComplete = quorumCfgIncomplete + "\n" + this.getUniquePortCfgForId(3);
            svrs.mt = new QuorumPeerTestBase.MainThread[3];
            svrs.mt[0] = new QuorumPeerTestBase.MainThread(1, svrs.clientPorts[0], quorumCfgIncomplete);
            for (int i3 = 1; i3 < numServers; ++i3) {
                svrs.mt[i3] = new QuorumPeerTestBase.MainThread(i3 + 1, svrs.clientPorts[i3], quorumCfgComplete);
            }
            svrs.mt[0].start();
            QuorumPeer quorumPeer1 = this.waitForQuorumPeer(svrs.mt[0], ClientBase.CONNECTION_TIMEOUT);
            Assert.assertTrue((quorumPeer1.getPeerState() == QuorumPeer.ServerState.LOOKING ? 1 : 0) != 0);
            int highestServerIndex = numServers - 1;
            svrs.mt[highestServerIndex].start();
            QuorumPeer quorumPeer3 = this.waitForQuorumPeer(svrs.mt[highestServerIndex], ClientBase.CONNECTION_TIMEOUT);
            Assert.assertTrue((quorumPeer3.getPeerState() == QuorumPeer.ServerState.LOOKING ? 1 : 0) != 0);
            for (i = 1; i < highestServerIndex; ++i) {
                svrs.mt[i].start();
            }
            for (i = 1; i < numServers; ++i) {
                Assert.assertTrue((String)"waiting for server to start", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + svrs.clientPorts[i], ClientBase.CONNECTION_TIMEOUT));
            }
            boolean firstAndSecondNodeFormedCluster = false;
            if (QuorumPeer.ServerState.LEADING == svrs.mt[1].getQuorumPeer().getPeerState()) {
                Assert.assertEquals((Object)((Object)QuorumPeer.ServerState.FOLLOWING), (Object)((Object)svrs.mt[0].getQuorumPeer().getPeerState()));
                Assert.assertEquals((Object)((Object)QuorumPeer.ServerState.FOLLOWING), (Object)((Object)svrs.mt[highestServerIndex].getQuorumPeer().getPeerState()));
                firstAndSecondNodeFormedCluster = true;
            } else {
                Assert.assertEquals((Object)((Object)QuorumPeer.ServerState.LOOKING), (Object)((Object)svrs.mt[0].getQuorumPeer().getPeerState()));
                Assert.assertEquals((Object)((Object)QuorumPeer.ServerState.LEADING), (Object)((Object)svrs.mt[highestServerIndex].getQuorumPeer().getPeerState()));
            }
            for (int i4 = 1; i4 < highestServerIndex; ++i4) {
                Assert.assertTrue((svrs.mt[i4].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.FOLLOWING || svrs.mt[i4].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.LEADING ? 1 : 0) != 0);
            }
            LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
            Pattern leading = Pattern.compile(".*myid=1.*QuorumPeer.*LEADING.*");
            Pattern following = Pattern.compile(".*myid=1.*QuorumPeer.*FOLLOWING.*");
            while ((line = r.readLine()) != null && !foundLeading && !foundFollowing) {
                foundLeading = leading.matcher(line).matches();
                foundFollowing = following.matcher(line).matches();
            }
            if (firstAndSecondNodeFormedCluster) {
                Assert.assertTrue((String)"Corrupt peer should join quorum with servers having same server configuration", (boolean)foundFollowing);
            } else {
                Assert.assertFalse((String)"Corrupt peer should never become leader", (boolean)foundLeading);
                Assert.assertFalse((String)"Corrupt peer should not attempt connection to out of view leader", (boolean)foundFollowing);
            }
        }
        finally {
            qlogger.removeAppender((Appender)appender);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDataDirAndDataLogDir() throws Exception {
        File dataDir = ClientBase.createEmptyTestDir();
        File dataLogDir = ClientBase.createEmptyTestDir();
        try {
            QuorumPeerConfig configMock = Mockito.mock(QuorumPeerConfig.class);
            Mockito.when(configMock.getDataDir()).thenReturn(dataDir);
            Mockito.when(configMock.getDataLogDir()).thenReturn(dataLogDir);
            Mockito.when(configMock.getMetricsProviderClassName()).thenReturn(NullMetricsProvider.class.getName());
            QuorumPeer qpMock = Mockito.mock(QuorumPeer.class);
            Mockito.doCallRealMethod().when(qpMock).setTxnFactory((FileTxnSnapLog)ArgumentMatchers.any(FileTxnSnapLog.class));
            Mockito.when(qpMock.getTxnFactory()).thenCallRealMethod();
            InjectableQuorumPeerMain qpMain = new InjectableQuorumPeerMain(qpMock);
            qpMain.runFromConfig(configMock);
            FileTxnSnapLog txnFactory = qpMain.getQuorumPeer().getTxnFactory();
            Assert.assertEquals((Object)Paths.get(dataLogDir.getAbsolutePath(), "version-2").toString(), (Object)txnFactory.getDataDir().getAbsolutePath());
            Assert.assertEquals((Object)Paths.get(dataDir.getAbsolutePath(), "version-2").toString(), (Object)txnFactory.getSnapDir().getAbsolutePath());
        }
        finally {
            FileUtils.deleteDirectory((File)dataDir);
            FileUtils.deleteDirectory((File)dataLogDir);
        }
    }

    private WriterAppender getConsoleAppender(ByteArrayOutputStream os, Level level) {
        String loggingPattern = ((PatternLayout)Logger.getRootLogger().getAppender("CONSOLE").getLayout()).getConversionPattern();
        WriterAppender appender = new WriterAppender((Layout)new PatternLayout(loggingPattern), (OutputStream)os);
        appender.setThreshold((Priority)level);
        return appender;
    }

    private String getUniquePortCfgForId(int id) {
        return String.format("server.%d=127.0.0.1:%d:%d", id, PortAssignment.unique(), PortAssignment.unique());
    }

    private QuorumPeer waitForQuorumPeer(QuorumPeerTestBase.MainThread mainThread, int timeout) throws TimeoutException {
        long start = Time.currentElapsedTime();
        while (true) {
            QuorumPeer quorumPeer;
            QuorumPeer quorumPeer2 = quorumPeer = mainThread.isAlive() ? mainThread.getQuorumPeer() : null;
            if (quorumPeer != null) {
                return quorumPeer;
            }
            if (Time.currentElapsedTime() > start + (long)timeout) {
                LOG.error("Timed out while waiting for QuorumPeer");
                throw new TimeoutException();
            }
            try {
                Thread.sleep(250L);
            }
            catch (InterruptedException interruptedException) {
            }
        }
    }

    private Leader.Proposal findProposalOfType(Map<Long, Leader.Proposal> proposals, int type) {
        for (Leader.Proposal proposal : proposals.values()) {
            if (proposal.request.getHdr().getType() != type) continue;
            return proposal;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInconsistentDueToNewLeaderOrder() throws Exception {
        int ENSEMBLE_SERVERS = 3;
        int[] clientPorts = new int[3];
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 3; ++i) {
            clientPorts[i] = PortAssignment.unique();
            String server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[i];
            sb.append(server + "\n");
        }
        String currentQuorumCfgSection = sb.toString();
        QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[3];
        ZooKeeper[] zk = new ZooKeeper[3];
        Context[] contexts = new Context[3];
        for (int i = 0; i < 3; ++i) {
            Context context;
            contexts[i] = context = new Context();
            mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts[i], currentQuorumCfgSection, false){

                @Override
                public QuorumPeerTestBase.TestQPMain getTestQPMain() {
                    return new CustomizedQPMain(context);
                }
            };
            mt[i].start();
            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
        }
        QuorumPeerMainTest.waitForAll(zk, ZooKeeper.States.CONNECTED);
        LOG.info("all servers started");
        final String nodePath = "/testInconsistentDueToNewLeader";
        int leaderId = -1;
        int followerA = -1;
        for (int i = 0; i < 3; ++i) {
            if (mt[i].main.quorumPeer.leader != null) {
                leaderId = i;
                continue;
            }
            if (followerA != -1) continue;
            followerA = i;
        }
        LOG.info("shutdown follower {}", (Object)followerA);
        mt[followerA].shutdown();
        QuorumPeerMainTest.waitForOne(zk[followerA], ZooKeeper.States.CONNECTING);
        try {
            LOG.info("force snapshot sync");
            System.setProperty("zookeeper.forceSnapshotSync", "true");
            String initialValue = "1";
            final ZooKeeper leaderZk = zk[leaderId];
            leaderZk.create(nodePath, initialValue.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            LOG.info("created node {} with value {}", (Object)nodePath, (Object)initialValue);
            CustomQuorumPeer leaderQuorumPeer = (CustomQuorumPeer)mt[leaderId].main.quorumPeer;
            leaderQuorumPeer.setStartForwardingListener(new StartForwardingListener(){

                @Override
                public void start() {
                    if (!Boolean.getBoolean("zookeeper.forceSnapshotSync")) {
                        return;
                    }
                    String value = "2";
                    QuorumPeerTestBase.LOG.info("start forwarding, set {} to {}", (Object)nodePath, (Object)"2");
                    try {
                        leaderZk.setData(nodePath, "2".getBytes(), -1, new AsyncCallback.StatCallback(){

                            @Override
                            public void processResult(int rc, String path, Object ctx, Stat stat) {
                            }
                        }, null);
                        Thread.sleep(1000L);
                    }
                    catch (Exception e) {
                        QuorumPeerTestBase.LOG.error("error when set {} to {}", new Object[]{nodePath, "2", e});
                    }
                }
            });
            leaderQuorumPeer.setBeginSnapshotListener(new BeginSnapshotListener(){

                @Override
                public void start() {
                    String value = "3";
                    QuorumPeerTestBase.LOG.info("before sending snapshot, set {} to {}", (Object)nodePath, (Object)value);
                    try {
                        leaderZk.setData(nodePath, value.getBytes(), -1);
                        QuorumPeerTestBase.LOG.info("successfully set {} to {}", (Object)nodePath, (Object)value);
                    }
                    catch (Exception e) {
                        QuorumPeerTestBase.LOG.error("error when set {} to {}, {}", new Object[]{nodePath, value, e});
                    }
                }
            });
            CustomQuorumPeer followerAQuorumPeer = (CustomQuorumPeer)mt[followerA].main.quorumPeer;
            LOG.info("set exit when ack new leader packet on {}", (Object)followerA);
            contexts[followerA].exitWhenAckNewLeader = true;
            final CountDownLatch latch = new CountDownLatch(1);
            final QuorumPeerTestBase.MainThread followerAMT = mt[followerA];
            contexts[followerA].newLeaderAckCallback = new NewLeaderAckCallback(){

                @Override
                public void start() {
                    try {
                        latch.countDown();
                        followerAMT.shutdown();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            };
            LOG.info("starting follower {}", (Object)followerA);
            mt[followerA].start();
            Assert.assertTrue((boolean)latch.await(30L, TimeUnit.SECONDS));
            LOG.info("disable exit when ack new leader packet on {}", (Object)followerA);
            System.setProperty("zookeeper.forceSnapshotSync", "false");
            contexts[followerA].exitWhenAckNewLeader = true;
            contexts[followerA].newLeaderAckCallback = null;
            LOG.info("restarting follower {}", (Object)followerA);
            mt[followerA].start();
            zk[followerA].close();
            zk[followerA] = new ZooKeeper("127.0.0.1:" + clientPorts[followerA], ClientBase.CONNECTION_TIMEOUT, this);
            QuorumPeerMainTest.waitForOne(zk[followerA], ZooKeeper.States.CONNECTED);
            Assert.assertEquals((Object)new String(zk[followerA].getData(nodePath, null, null)), (Object)new String(zk[leaderId].getData(nodePath, null, null)));
        }
        finally {
            System.clearProperty("zookeeper.forceSnapshotSync");
            for (int i = 0; i < 3; ++i) {
                mt[i].shutdown();
                zk[i].close();
            }
        }
    }

    @Test
    public void testLeaderElectionWithDisloyalVoter() throws IOException {
        this.testLeaderElection(5, 3, 1000, 10000);
    }

    @Test
    public void testLeaderElectionWithDisloyalVoter_stillHasMajority() throws IOException {
        this.testLeaderElection(5, 5, 3000, 20000);
    }

    void testLeaderElection(int totalServers, int serversToStart, int maxTimeToWaitForEpoch, int maxTimeWaitForServerUp) throws IOException {
        int i;
        Leader.setMaxTimeToWaitForEpoch(maxTimeToWaitForEpoch);
        this.servers = new QuorumPeerTestBase.Servers();
        int ENSEMBLE_SERVERS = totalServers;
        int[] clientPorts = new int[ENSEMBLE_SERVERS];
        StringBuilder sb = new StringBuilder();
        for (int i2 = 0; i2 < ENSEMBLE_SERVERS; ++i2) {
            clientPorts[i2] = PortAssignment.unique();
            String server = "server." + i2 + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[i2];
            sb.append(server + "\n");
        }
        String currentQuorumCfgSection = sb.toString();
        int SERVERS_TO_START = serversToStart;
        QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVERS_TO_START];
        Context[] contexts = new Context[SERVERS_TO_START];
        this.servers.mt = mt;
        this.numServers = SERVERS_TO_START;
        for (i = 0; i < SERVERS_TO_START; ++i) {
            final Context context = new Context();
            if (i == 0) {
                context.quitFollowing = true;
            }
            contexts[i] = context;
            mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts[i], currentQuorumCfgSection, false){

                @Override
                public QuorumPeerTestBase.TestQPMain getTestQPMain() {
                    return new CustomizedQPMain(context);
                }
            };
            mt[i].start();
        }
        for (i = 0; i < SERVERS_TO_START; ++i) {
            Assert.assertTrue((String)("Server " + i + " should have joined quorum by now"), (boolean)ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], maxTimeWaitForServerUp));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMetricsProviderLifecycle() throws Exception {
        ClientBase.setupTestEnv();
        BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.reset();
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        WriterAppender appender = this.getConsoleAppender(os, Level.WARN);
        Logger qlogger = Logger.getLogger((String)"io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum");
        qlogger.addAppender((Appender)appender);
        try {
            int CLIENT_PORT_QP1 = PortAssignment.unique();
            int CLIENT_PORT_QP2 = PortAssignment.unique();
            String quorumCfgSectionServer = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2 + "\n";
            String quorumCfgSectionServer1 = quorumCfgSectionServer + "metricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.class.getName() + "\n";
            QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSectionServer1);
            QuorumPeerTestBase.MainThread q2 = new QuorumPeerTestBase.MainThread(2, CLIENT_PORT_QP2, quorumCfgSectionServer);
            q1.start();
            q2.start();
            boolean isup1 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 30000L);
            boolean isup2 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, 30000L);
            Assert.assertTrue((String)"Server 1 never came up", (boolean)isup1);
            Assert.assertTrue((String)"Server 2 never came up", (boolean)isup2);
            q1.shutdown();
            q2.shutdown();
            Assert.assertTrue((String)"waiting for server 1 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue((String)"waiting for server 2 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        }
        finally {
            qlogger.removeAppender((Appender)appender);
        }
        Assert.assertTrue((String)"metrics provider lifecycle error", (boolean)BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.configureCalled.get());
        Assert.assertTrue((String)"metrics provider lifecycle error", (boolean)BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.startCalled.get());
        Assert.assertTrue((String)"metrics provider lifecycle error", (boolean)BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.getRootContextCalled.get());
        Assert.assertTrue((String)"metrics provider lifecycle error", (boolean)BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.stopCalled.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMetricsProviderConfiguration() throws Exception {
        ClientBase.setupTestEnv();
        BaseTestMetricsProvider.MetricsProviderWithConfiguration.httpPort.set(0);
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        WriterAppender appender = this.getConsoleAppender(os, Level.WARN);
        Logger qlogger = Logger.getLogger((String)"io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum");
        qlogger.addAppender((Appender)appender);
        try {
            int CLIENT_PORT_QP1 = PortAssignment.unique();
            int CLIENT_PORT_QP2 = PortAssignment.unique();
            String quorumCfgSectionServer = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2 + "\n";
            String quorumCfgSectionServer1 = quorumCfgSectionServer + "metricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderWithConfiguration.class.getName() + "\nmetricsProvider.httpPort=1234";
            QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSectionServer1);
            QuorumPeerTestBase.MainThread q2 = new QuorumPeerTestBase.MainThread(2, CLIENT_PORT_QP2, quorumCfgSectionServer);
            q1.start();
            q2.start();
            boolean isup1 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 30000L);
            boolean isup2 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, 30000L);
            Assert.assertTrue((String)"Server 1 never came up", (boolean)isup1);
            Assert.assertTrue((String)"Server 2 never came up", (boolean)isup2);
            q1.shutdown();
            q2.shutdown();
            Assert.assertTrue((String)"waiting for server 1 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue((String)"waiting for server 2 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        }
        finally {
            qlogger.removeAppender((Appender)appender);
        }
        Assert.assertEquals((long)1234L, (long)BaseTestMetricsProvider.MetricsProviderWithConfiguration.httpPort.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFaultyMetricsProviderOnStop() throws Exception {
        String line;
        ClientBase.setupTestEnv();
        BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.reset();
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        WriterAppender appender = this.getConsoleAppender(os, Level.WARN);
        Logger qlogger = Logger.getLogger((String)"io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum");
        qlogger.addAppender((Appender)appender);
        try {
            int CLIENT_PORT_QP1 = PortAssignment.unique();
            int CLIENT_PORT_QP2 = PortAssignment.unique();
            String quorumCfgSectionServer = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2 + "\n";
            String quorumCfgSectionServer1 = quorumCfgSectionServer + "metricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderWithErrorInStop.class.getName() + "\n";
            QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSectionServer1);
            QuorumPeerTestBase.MainThread q2 = new QuorumPeerTestBase.MainThread(2, CLIENT_PORT_QP2, quorumCfgSectionServer);
            q1.start();
            q2.start();
            boolean isup1 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 30000L);
            boolean isup2 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, 30000L);
            Assert.assertTrue((String)"Server 1 never came up", (boolean)isup1);
            Assert.assertTrue((String)"Server 2 never came up", (boolean)isup2);
            q1.shutdown();
            q2.shutdown();
            Assert.assertTrue((String)"waiting for server 1 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue((String)"waiting for server 2 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        }
        finally {
            qlogger.removeAppender((Appender)appender);
        }
        Assert.assertTrue((String)"metrics provider lifecycle error", (boolean)BaseTestMetricsProvider.MetricsProviderWithErrorInStop.stopCalled.get());
        LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
        boolean found = false;
        Pattern p = Pattern.compile(".*Error while stopping metrics.*");
        while ((line = r.readLine()) != null && !(found = p.matcher(line).matches())) {
        }
        Assert.assertTrue((String)"complains about metrics provider", (boolean)found);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInvalidMetricsProvider() throws Exception {
        String line;
        ClientBase.setupTestEnv();
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        WriterAppender appender = this.getConsoleAppender(os, Level.WARN);
        Logger qlogger = Logger.getLogger((String)"io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum");
        qlogger.addAppender((Appender)appender);
        try {
            int CLIENT_PORT_QP1 = PortAssignment.unique();
            String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nmetricsProvider.className=BadClass\n";
            QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
            q1.start();
            boolean isup = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 5000L);
            Assert.assertFalse((String)"Server never came up", (boolean)isup);
            q1.shutdown();
            Assert.assertTrue((String)"waiting for server 1 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
        }
        finally {
            qlogger.removeAppender((Appender)appender);
        }
        LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
        boolean found = false;
        Pattern p = Pattern.compile(".*BadClass.*");
        while ((line = r.readLine()) != null && !(found = p.matcher(line).matches())) {
        }
        Assert.assertTrue((String)"complains about metrics provider", (boolean)found);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFaultyMetricsProviderOnStart() throws Exception {
        String line;
        ClientBase.setupTestEnv();
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        WriterAppender appender = this.getConsoleAppender(os, Level.WARN);
        Logger qlogger = Logger.getLogger((String)"io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum");
        qlogger.addAppender((Appender)appender);
        try {
            int CLIENT_PORT_QP1 = PortAssignment.unique();
            String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nmetricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderWithErrorInStart.class.getName() + "\n";
            QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
            q1.start();
            boolean isup = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 5000L);
            Assert.assertFalse((String)"Server never came up", (boolean)isup);
            q1.shutdown();
            Assert.assertTrue((String)"waiting for server 1 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
        }
        finally {
            qlogger.removeAppender((Appender)appender);
        }
        LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
        boolean found = false;
        Pattern p = Pattern.compile(".*MetricsProviderLifeCycleException.*");
        while ((line = r.readLine()) != null && !(found = p.matcher(line).matches())) {
        }
        Assert.assertTrue((String)"complains about metrics provider MetricsProviderLifeCycleException", (boolean)found);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFaultyMetricsProviderOnConfigure() throws Exception {
        String line;
        ClientBase.setupTestEnv();
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        WriterAppender appender = this.getConsoleAppender(os, Level.WARN);
        Logger qlogger = Logger.getLogger((String)"io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum");
        qlogger.addAppender((Appender)appender);
        try {
            int CLIENT_PORT_QP1 = PortAssignment.unique();
            String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\nmetricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderWithErrorInConfigure.class.getName() + "\n";
            QuorumPeerTestBase.MainThread q1 = new QuorumPeerTestBase.MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
            q1.start();
            boolean isup = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 5000L);
            Assert.assertFalse((String)"Server never came up", (boolean)isup);
            q1.shutdown();
            Assert.assertTrue((String)"waiting for server 1 down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
        }
        finally {
            qlogger.removeAppender((Appender)appender);
        }
        LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
        boolean found = false;
        Pattern p = Pattern.compile(".*MetricsProviderLifeCycleException.*");
        while ((line = r.readLine()) != null && !(found = p.matcher(line).matches())) {
        }
        Assert.assertTrue((String)"complains about metrics provider MetricsProviderLifeCycleException", (boolean)found);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiffSyncAfterSnap() throws Exception {
        int ENSEMBLE_SERVERS = 3;
        QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[3];
        ZooKeeper[] zk = new ZooKeeper[3];
        try {
            int i;
            int[] clientPorts = new int[3];
            StringBuilder sb = new StringBuilder();
            for (int i2 = 0; i2 < 3; ++i2) {
                clientPorts[i2] = PortAssignment.unique();
                String server = "server." + i2 + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[i2];
                sb.append(server + "\n");
            }
            String currentQuorumCfgSection = sb.toString();
            Context[] contexts = new Context[3];
            for (int i3 = 0; i3 < 3; ++i3) {
                Context context;
                contexts[i3] = context = new Context();
                mt[i3] = new QuorumPeerTestBase.MainThread(i3, clientPorts[i3], currentQuorumCfgSection, false){

                    @Override
                    public QuorumPeerTestBase.TestQPMain getTestQPMain() {
                        return new CustomizedQPMain(context);
                    }
                };
                mt[i3].start();
                zk[i3] = new ZooKeeper("127.0.0.1:" + clientPorts[i3], ClientBase.CONNECTION_TIMEOUT, this);
            }
            QuorumPeerMainTest.waitForAll(zk, ZooKeeper.States.CONNECTED);
            LOG.info("all servers started");
            String nodePath = "/testDiffSyncAfterSnap";
            int leaderId = -1;
            int followerA = -1;
            for (int i4 = 2; i4 >= 0; --i4) {
                if (mt[i4].main.quorumPeer.leader != null) {
                    leaderId = i4;
                    continue;
                }
                if (followerA != -1) continue;
                followerA = i4;
            }
            LOG.info("shutdown follower {}", (Object)followerA);
            mt[followerA].shutdown();
            QuorumPeerMainTest.waitForOne(zk[followerA], ZooKeeper.States.CONNECTING);
            int index = 0;
            int numOfRequests = 10;
            for (int i5 = 0; i5 < numOfRequests; ++i5) {
                zk[leaderId].create("/testDiffSyncAfterSnap" + index++, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            CustomQuorumPeer leaderQuorumPeer = (CustomQuorumPeer)mt[leaderId].main.quorumPeer;
            contexts[followerA].newLeaderReceivedCallback = new NewLeaderReceivedCallback(){
                boolean processed = false;

                @Override
                public void process() throws IOException {
                    if (this.processed) {
                        return;
                    }
                    this.processed = true;
                    System.setProperty("zookeeper.forceSnapshotSync", "false");
                    throw new IOException("read timedout");
                }
            };
            LOG.info("force snapshot sync");
            System.setProperty("zookeeper.forceSnapshotSync", "true");
            mt[followerA].start();
            QuorumPeerMainTest.waitForOne(zk[followerA], ZooKeeper.States.CONNECTED);
            LOG.info("verify the nodes are exist in memory");
            for (i = 0; i < index; ++i) {
                Assert.assertNotNull((Object)zk[followerA].exists("/testDiffSyncAfterSnap" + i, false));
            }
            zk[leaderId].create("/testDiffSyncAfterSnap" + index++, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Thread.sleep(500L);
            LOG.info("restarting follower {}", (Object)followerA);
            mt[followerA].shutdown();
            QuorumPeerMainTest.waitForOne(zk[followerA], ZooKeeper.States.CONNECTING);
            mt[followerA].start();
            QuorumPeerMainTest.waitForOne(zk[followerA], ZooKeeper.States.CONNECTED);
            for (i = 0; i < index; ++i) {
                Assert.assertNotNull((String)("node " + i + " should exist"), (Object)zk[followerA].exists("/testDiffSyncAfterSnap" + i, false));
            }
        }
        finally {
            System.clearProperty("zookeeper.forceSnapshotSync");
            for (int i = 0; i < 3; ++i) {
                mt[i].shutdown();
                zk[i].close();
            }
        }
    }

    static class CustomQuorumPeer
    extends QuorumPeer {
        private Context context;
        private LearnerSyncThrottler throttler = null;
        private StartForwardingListener startForwardingListener;
        private BeginSnapshotListener beginSnapshotListener;

        public CustomQuorumPeer(Context context) throws SaslException {
            this.context = context;
        }

        public void setStartForwardingListener(StartForwardingListener startForwardingListener) {
            this.startForwardingListener = startForwardingListener;
        }

        public void setBeginSnapshotListener(BeginSnapshotListener beginSnapshotListener) {
            this.beginSnapshotListener = beginSnapshotListener;
        }

        @Override
        protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
            return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())){

                @Override
                void followLeader() throws InterruptedException {
                    if (((CustomQuorumPeer)this).context.quitFollowing) {
                        ((CustomQuorumPeer)this).context.quitFollowing = false;
                        LOG.info("Quit following");
                        return;
                    }
                    super.followLeader();
                }

                @Override
                void writePacket(QuorumPacket pp, boolean flush) throws IOException {
                    if (pp != null && pp.getType() == 3 && ((CustomQuorumPeer)this).context.exitWhenAckNewLeader && ((CustomQuorumPeer)this).context.newLeaderAckCallback != null) {
                        ((CustomQuorumPeer)this).context.newLeaderAckCallback.start();
                    }
                    super.writePacket(pp, flush);
                }

                @Override
                void readPacket(QuorumPacket qp) throws IOException {
                    super.readPacket(qp);
                    if (qp.getType() == 10 && ((CustomQuorumPeer)this).context.newLeaderReceivedCallback != null) {
                        ((CustomQuorumPeer)this).context.newLeaderReceivedCallback.process();
                    }
                }
            };
        }

        @Override
        protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
            return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.getZkDb())){

                @Override
                public long startForwarding(LearnerHandler handler, long lastSeenZxid) {
                    if (startForwardingListener != null) {
                        startForwardingListener.start();
                    }
                    return super.startForwarding(handler, lastSeenZxid);
                }

                @Override
                public LearnerSyncThrottler getLearnerSnapSyncThrottler() {
                    if (throttler == null) {
                        throttler = new LearnerSyncThrottler(this.getMaxConcurrentSnapSyncs(), LearnerSyncThrottler.SyncType.SNAP){

                            @Override
                            public void beginSync(boolean essential) throws SyncThrottleException, InterruptedException {
                                if (beginSnapshotListener != null) {
                                    beginSnapshotListener.start();
                                }
                                super.beginSync(essential);
                            }
                        };
                    }
                    return throttler;
                }
            };
        }
    }

    static class CustomizedQPMain
    extends QuorumPeerTestBase.TestQPMain {
        private Context context;

        public CustomizedQPMain(Context context) {
            this.context = context;
        }

        @Override
        protected QuorumPeer getQuorumPeer() throws SaslException {
            return new CustomQuorumPeer(this.context);
        }
    }

    static interface BeginSnapshotListener {
        public void start();
    }

    static interface StartForwardingListener {
        public void start();
    }

    static interface NewLeaderReceivedCallback {
        public void process() throws IOException;
    }

    static interface NewLeaderAckCallback {
        public void start();
    }

    static class Context {
        boolean quitFollowing = false;
        boolean exitWhenAckNewLeader = false;
        NewLeaderAckCallback newLeaderAckCallback = null;
        NewLeaderReceivedCallback newLeaderReceivedCallback = null;

        Context() {
        }
    }

    private class InjectableQuorumPeerMain
    extends QuorumPeerMain {
        QuorumPeer qp;

        InjectableQuorumPeerMain(QuorumPeer qp) {
            this.qp = qp;
        }

        @Override
        protected QuorumPeer getQuorumPeer() {
            return this.qp;
        }
    }
}

