/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hadoop.shaded.org.apache.zookeeper.test;

import io.hops.hadoop.shaded.org.apache.zookeeper.AsyncCallback;
import io.hops.hadoop.shaded.org.apache.zookeeper.CreateMode;
import io.hops.hadoop.shaded.org.apache.zookeeper.KeeperException;
import io.hops.hadoop.shaded.org.apache.zookeeper.TestableZooKeeper;
import io.hops.hadoop.shaded.org.apache.zookeeper.WatchedEvent;
import io.hops.hadoop.shaded.org.apache.zookeeper.Watcher;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZKTestCase;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZooDefs;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ZKDatabase;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.Leader;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.ClientBase;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.ClientTest;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.DisconnectableZooKeeper;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.QuorumUtil;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FollowerResyncConcurrencyTest
extends ZKTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(FollowerResyncConcurrencyTest.class);
    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
    private AtomicInteger counter = new AtomicInteger(0);
    private AtomicInteger errors = new AtomicInteger(0);
    private AtomicInteger pending = new AtomicInteger(0);

    @Before
    public void setUp() throws Exception {
        this.pending.set(0);
        this.errors.set(0);
        this.counter.set(0);
    }

    @After
    public void tearDown() throws Exception {
        LOG.info("Error count {}", (Object)this.errors.get());
    }

    @Test
    public void testLaggingFollowerResyncsUnderNewEpoch() throws Exception {
        ClientBase.CountdownWatcher watcher1 = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher watcher2 = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher watcher3 = new ClientBase.CountdownWatcher();
        QuorumUtil qu = new QuorumUtil(1);
        qu.shutdownAll();
        qu.start(1);
        qu.start(2);
        Assert.assertTrue((String)"Waiting for server up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer((int)1).clientPort, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue((String)"Waiting for server up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer((int)2).clientPort, ClientBase.CONNECTION_TIMEOUT));
        DisconnectableZooKeeper zk1 = FollowerResyncConcurrencyTest.createClient(qu.getPeer((int)1).peer.getClientPort(), watcher1);
        LOG.info("zk1 has session id 0x{}", (Object)Long.toHexString(zk1.getSessionId()));
        String resyncPath = "/resyncundernewepoch";
        zk1.create("/resyncundernewepoch", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.close();
        qu.shutdown(1);
        qu.shutdown(2);
        Assert.assertTrue((String)"Waiting for server down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer((int)1).clientPort, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue((String)"Waiting for server down", (boolean)ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer((int)2).clientPort, ClientBase.CONNECTION_TIMEOUT));
        qu.start(1);
        qu.start(2);
        Assert.assertTrue((String)"Waiting for server up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer((int)1).clientPort, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue((String)"Waiting for server up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer((int)2).clientPort, ClientBase.CONNECTION_TIMEOUT));
        qu.start(3);
        Assert.assertTrue((String)"Waiting for server up", (boolean)ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer((int)3).clientPort, ClientBase.CONNECTION_TIMEOUT));
        zk1 = FollowerResyncConcurrencyTest.createClient(qu.getPeer((int)1).peer.getClientPort(), watcher1);
        LOG.info("zk1 has session id 0x{}", (Object)Long.toHexString(zk1.getSessionId()));
        Assert.assertNotNull((String)"zk1 has data", (Object)zk1.exists("/resyncundernewepoch", false));
        DisconnectableZooKeeper zk2 = FollowerResyncConcurrencyTest.createClient(qu.getPeer((int)2).peer.getClientPort(), watcher2);
        LOG.info("zk2 has session id 0x{}", (Object)Long.toHexString(zk2.getSessionId()));
        Assert.assertNotNull((String)"zk2 has data", (Object)zk2.exists("/resyncundernewepoch", false));
        DisconnectableZooKeeper zk3 = FollowerResyncConcurrencyTest.createClient(qu.getPeer((int)3).peer.getClientPort(), watcher3);
        LOG.info("zk3 has session id 0x{}", (Object)Long.toHexString(zk3.getSessionId()));
        Assert.assertNotNull((String)"zk3 has data", (Object)zk3.exists("/resyncundernewepoch", false));
        zk1.close();
        zk2.close();
        zk3.close();
        qu.shutdownAll();
    }

    @Test
    public void testResyncBySnapThenDiffAfterFollowerCrashes() throws Throwable {
        this.followerResyncCrashTest(false);
    }

    @Test
    public void testResyncByTxnlogThenDiffAfterFollowerCrashes() throws Throwable {
        this.followerResyncCrashTest(true);
    }

    public void followerResyncCrashTest(boolean useTxnLogResync) throws Throwable {
        final Semaphore sem = new Semaphore(0);
        QuorumUtil qu = new QuorumUtil(1);
        qu.startAll();
        ClientBase.CountdownWatcher watcher1 = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher watcher2 = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher watcher3 = new ClientBase.CountdownWatcher();
        int index = 1;
        while (qu.getPeer((int)index).peer.leader == null) {
            ++index;
        }
        Leader leader = qu.getPeer((int)index).peer.leader;
        Assert.assertNotNull((Object)leader);
        if (useTxnLogResync) {
            qu.getPeer((int)index).peer.getActiveServer().getZKDatabase().setSnapshotSizeFactor(1000.0);
        } else {
            qu.getPeer((int)index).peer.getActiveServer().getZKDatabase().setSnapshotSizeFactor(-1.0);
        }
        index = index == 1 ? 2 : 1;
        LOG.info("Connecting to follower: {}", (Object)index);
        qu.shutdown(index);
        final DisconnectableZooKeeper zk3 = FollowerResyncConcurrencyTest.createClient(qu.getPeer((int)3).peer.getClientPort(), watcher3);
        LOG.info("zk3 has session id 0x{}", (Object)Long.toHexString(zk3.getSessionId()));
        zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        qu.restart(index);
        DisconnectableZooKeeper zk1 = FollowerResyncConcurrencyTest.createClient(qu.getPeer((int)index).peer.getClientPort(), watcher1);
        LOG.info("zk1 has session id 0x{}", (Object)Long.toHexString(zk1.getSessionId()));
        DisconnectableZooKeeper zk2 = FollowerResyncConcurrencyTest.createClient(qu.getPeer((int)index).peer.getClientPort(), watcher2);
        LOG.info("zk2 has session id 0x{}", (Object)Long.toHexString(zk2.getSessionId()));
        zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Thread mytestfooThread = new Thread(new Runnable(){

            @Override
            public void run() {
                for (int i = 0; i < 3000; ++i) {
                    zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback(){

                        @Override
                        public void processResult(int rc, String path, Object ctx, String name) {
                            FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                            FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                            if (rc != 0) {
                                FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                            }
                            if (FollowerResyncConcurrencyTest.this.counter.get() == 16200) {
                                sem.release();
                            }
                        }
                    }, null);
                    FollowerResyncConcurrencyTest.this.pending.incrementAndGet();
                    if (i % 10 != 0) continue;
                    try {
                        Thread.sleep(100L);
                        continue;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        });
        for (int i = 0; i < 13000; ++i) {
            zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback(){

                @Override
                public void processResult(int rc, String path, Object ctx, String name) {
                    FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                    FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                    if (rc != 0) {
                        FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                    }
                    if (FollowerResyncConcurrencyTest.this.counter.get() == 16200) {
                        sem.release();
                    }
                }
            }, null);
            this.pending.incrementAndGet();
            if (i == 5000) {
                qu.shutdown(index);
                LOG.info("Shutting down s1");
            }
            if (i == 12000) {
                mytestfooThread.start();
                LOG.info("Restarting follower: {}", (Object)index);
                qu.restart(index);
                Thread.sleep(300L);
                LOG.info("Shutdown follower: {}", (Object)index);
                qu.shutdown(index);
                Thread.sleep(300L);
                LOG.info("Restarting follower: {}", (Object)index);
                qu.restart(index);
                LOG.info("Setting up server: {}", (Object)index);
            }
            if (i % 1000 == 0) {
                Thread.sleep(1000L);
            }
            if (i % 50 != 0) continue;
            zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback(){

                @Override
                public void processResult(int rc, String path, Object ctx, String name) {
                    FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                    FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                    if (rc != 0) {
                        FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                    }
                    if (FollowerResyncConcurrencyTest.this.counter.get() == 16200) {
                        sem.release();
                    }
                }
            }, null);
            this.pending.incrementAndGet();
        }
        if (!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
            LOG.warn("Did not aquire semaphore fast enough");
        }
        mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
        if (mytestfooThread.isAlive()) {
            LOG.error("mytestfooThread is still alive");
        }
        Assert.assertTrue((boolean)this.waitForPendingRequests(60));
        Assert.assertTrue((boolean)this.waitForSync(qu, index, 10));
        this.verifyState(qu, index, leader);
        zk1.close();
        zk2.close();
        zk3.close();
        qu.shutdownAll();
    }

    @Test
    public void testResyncByDiffAfterFollowerCrashes() throws IOException, InterruptedException, KeeperException, Throwable {
        final Semaphore sem = new Semaphore(0);
        QuorumUtil qu = new QuorumUtil(1);
        qu.startAll();
        ClientBase.CountdownWatcher watcher1 = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher watcher2 = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher watcher3 = new ClientBase.CountdownWatcher();
        int index = 1;
        while (qu.getPeer((int)index).peer.leader == null) {
            ++index;
        }
        Leader leader = qu.getPeer((int)index).peer.leader;
        Assert.assertNotNull((Object)leader);
        index = index == 1 ? 2 : 1;
        LOG.info("Connecting to follower: {}", (Object)index);
        DisconnectableZooKeeper zk1 = FollowerResyncConcurrencyTest.createClient(qu.getPeer((int)index).peer.getClientPort(), watcher1);
        LOG.info("zk1 has session id 0x{}", (Object)Long.toHexString(zk1.getSessionId()));
        DisconnectableZooKeeper zk2 = FollowerResyncConcurrencyTest.createClient(qu.getPeer((int)index).peer.getClientPort(), watcher2);
        LOG.info("zk2 has session id 0x{}", (Object)Long.toHexString(zk2.getSessionId()));
        final DisconnectableZooKeeper zk3 = FollowerResyncConcurrencyTest.createClient(qu.getPeer((int)3).peer.getClientPort(), watcher3);
        LOG.info("zk3 has session id 0x{}", (Object)Long.toHexString(zk3.getSessionId()));
        zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        final AtomicBoolean runNow = new AtomicBoolean(false);
        Thread mytestfooThread = new Thread(new Runnable(){

            @Override
            public void run() {
                int inSyncCounter = 0;
                while (inSyncCounter < 400) {
                    if (runNow.get()) {
                        zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback(){

                            @Override
                            public void processResult(int rc, String path, Object ctx, String name) {
                                FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                                FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                                if (rc != 0) {
                                    FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                                }
                                if (FollowerResyncConcurrencyTest.this.counter.get() > 7300) {
                                    sem.release();
                                }
                            }
                        }, null);
                        FollowerResyncConcurrencyTest.this.pending.incrementAndGet();
                        try {
                            Thread.sleep(10L);
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                        ++inSyncCounter;
                        continue;
                    }
                    Thread.yield();
                }
            }
        });
        mytestfooThread.start();
        for (int i = 0; i < 5000; ++i) {
            zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback(){

                @Override
                public void processResult(int rc, String path, Object ctx, String name) {
                    FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                    FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                    if (rc != 0) {
                        FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                    }
                    if (FollowerResyncConcurrencyTest.this.counter.get() > 7300) {
                        sem.release();
                    }
                }
            }, null);
            this.pending.incrementAndGet();
            if (i == 1000) {
                qu.shutdown(index);
                Thread.sleep(1100L);
                LOG.info("Shutting down s1");
            }
            if (i == 1100 || i == 1150 || i == 1200) {
                Thread.sleep(1000L);
            }
            if (i == 1200) {
                qu.startThenShutdown(index);
                runNow.set(true);
                qu.restart(index);
                LOG.info("Setting up server: {}", (Object)index);
            }
            if (i >= 1000 && i % 2 == 0) {
                zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback(){

                    @Override
                    public void processResult(int rc, String path, Object ctx, String name) {
                        FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                        FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                        if (rc != 0) {
                            FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                        }
                        if (FollowerResyncConcurrencyTest.this.counter.get() > 7300) {
                            sem.release();
                        }
                    }
                }, null);
                this.pending.incrementAndGet();
            }
            if (i != 1050 && i != 1100 && i != 1150) continue;
            Thread.sleep(1000L);
        }
        if (!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
            LOG.warn("Did not aquire semaphore fast enough");
        }
        mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
        if (mytestfooThread.isAlive()) {
            LOG.error("mytestfooThread is still alive");
        }
        Assert.assertTrue((boolean)this.waitForPendingRequests(60));
        Assert.assertTrue((boolean)this.waitForSync(qu, index, 10));
        this.verifyState(qu, index, leader);
        zk1.close();
        zk2.close();
        zk3.close();
        qu.shutdownAll();
    }

    private static DisconnectableZooKeeper createClient(int port, ClientBase.CountdownWatcher watcher) throws IOException, TimeoutException, InterruptedException {
        DisconnectableZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + port, ClientBase.CONNECTION_TIMEOUT, watcher);
        watcher.waitForConnected(CONNECTION_TIMEOUT);
        return zk;
    }

    private boolean waitForPendingRequests(int timeout) throws InterruptedException {
        LOG.info("Wait for pending requests: {}", (Object)this.pending.get());
        for (int i = 0; i < timeout; ++i) {
            Thread.sleep(1000L);
            if (this.pending.get() != 0) continue;
            return true;
        }
        LOG.info("Timeout waiting for pending requests: {}", (Object)this.pending.get());
        return false;
    }

    private boolean waitForSync(QuorumUtil qu, int index, int timeout) throws InterruptedException {
        LOG.info("Wait for server to sync");
        int leaderIndex = index == 1 ? 2 : 1;
        ZKDatabase restartedDb = qu.getPeer((int)index).peer.getActiveServer().getZKDatabase();
        ZKDatabase cleanDb = qu.getPeer((int)3).peer.getActiveServer().getZKDatabase();
        ZKDatabase leadDb = qu.getPeer((int)leaderIndex).peer.getActiveServer().getZKDatabase();
        long leadZxid = 0L;
        long cleanZxid = 0L;
        long restartedZxid = 0L;
        for (int i = 0; i < timeout; ++i) {
            leadZxid = leadDb.getDataTreeLastProcessedZxid();
            cleanZxid = cleanDb.getDataTreeLastProcessedZxid();
            restartedZxid = restartedDb.getDataTreeLastProcessedZxid();
            if (leadZxid == cleanZxid && leadZxid == restartedZxid) {
                return true;
            }
            Thread.sleep(1000L);
        }
        LOG.info("Timeout waiting for zxid to sync: leader 0x{} clean 0x{} restarted 0x{}", new Object[]{Long.toHexString(leadZxid), Long.toHexString(cleanZxid), Long.toHexString(restartedZxid)});
        return false;
    }

    private static TestableZooKeeper createTestableClient(String hp) throws IOException, TimeoutException, InterruptedException {
        ClientBase.CountdownWatcher watcher = new ClientBase.CountdownWatcher();
        return FollowerResyncConcurrencyTest.createTestableClient(watcher, hp);
    }

    private static TestableZooKeeper createTestableClient(ClientBase.CountdownWatcher watcher, String hp) throws IOException, TimeoutException, InterruptedException {
        TestableZooKeeper zk = new TestableZooKeeper(hp, ClientBase.CONNECTION_TIMEOUT, (Watcher)watcher);
        watcher.waitForConnected(CONNECTION_TIMEOUT);
        return zk;
    }

    private void verifyState(QuorumUtil qu, int index, Leader leader) {
        LOG.info("Verifying state");
        Assert.assertTrue((String)"Not following", (qu.getPeer((int)index).peer.follower != null ? 1 : 0) != 0);
        long epochF = qu.getPeer((int)index).peer.getActiveServer().getZxid() >> 32;
        long epochL = leader.getEpoch() >> 32;
        Assert.assertTrue((String)("Zxid: " + qu.getPeer((int)index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() + "Current epoch: " + epochF), (epochF == epochL ? 1 : 0) != 0);
        int leaderIndex = index == 1 ? 2 : 1;
        Collection<Long> sessionsRestarted = qu.getPeer((int)index).peer.getActiveServer().getZKDatabase().getSessions();
        Collection<Long> sessionsNotRestarted = qu.getPeer((int)leaderIndex).peer.getActiveServer().getZKDatabase().getSessions();
        for (Long l : sessionsRestarted) {
            Assert.assertTrue((String)("Should have same set of sessions in both servers, did not expect: " + l), (boolean)sessionsNotRestarted.contains(l));
        }
        Assert.assertEquals((String)"Should have same number of sessions", (long)sessionsNotRestarted.size(), (long)sessionsRestarted.size());
        ZKDatabase restarted = qu.getPeer((int)index).peer.getActiveServer().getZKDatabase();
        ZKDatabase clean = qu.getPeer((int)3).peer.getActiveServer().getZKDatabase();
        ZKDatabase lead = qu.getPeer((int)leaderIndex).peer.getActiveServer().getZKDatabase();
        for (Long l : sessionsRestarted) {
            LOG.info("Validating ephemeral for session id 0x{}", (Object)Long.toHexString(l));
            Assert.assertTrue((String)("Should have same set of sessions in both servers, did not expect: " + l), (boolean)sessionsNotRestarted.contains(l));
            Set<String> ephemerals = restarted.getEphemerals(l);
            Set<String> cleanEphemerals = clean.getEphemerals(l);
            for (String o : cleanEphemerals) {
                if (ephemerals.contains(o)) continue;
                LOG.info("Restarted follower doesn't contain ephemeral {} zxid 0x{}", (Object)o, (Object)Long.toHexString(clean.getDataTree().getNode((String)o).stat.getMzxid()));
            }
            for (String o : ephemerals) {
                if (cleanEphemerals.contains(o)) continue;
                LOG.info("Restarted follower has extra ephemeral {} zxid 0x{}", (Object)o, (Object)Long.toHexString(restarted.getDataTree().getNode((String)o).stat.getMzxid()));
            }
            Set<String> leadEphemerals = lead.getEphemerals(l);
            for (String o : leadEphemerals) {
                if (cleanEphemerals.contains(o)) continue;
                LOG.info("Follower doesn't contain ephemeral from leader {} zxid 0x{}", (Object)o, (Object)Long.toHexString(lead.getDataTree().getNode((String)o).stat.getMzxid()));
            }
            for (String o : cleanEphemerals) {
                if (leadEphemerals.contains(o)) continue;
                LOG.info("Leader doesn't contain ephemeral from follower {} zxid 0x{}", (Object)o, (Object)Long.toHexString(clean.getDataTree().getNode((String)o).stat.getMzxid()));
            }
            Assert.assertEquals((String)"Should have same number of ephemerals in both followers", (long)ephemerals.size(), (long)cleanEphemerals.size());
            Assert.assertEquals((String)"Leader should equal follower", (long)lead.getEphemerals(l).size(), (long)cleanEphemerals.size());
        }
    }

    @Test
    public void testFollowerSendsLastZxid() throws Exception {
        QuorumUtil qu = new QuorumUtil(1);
        qu.startAll();
        int index = 1;
        while (qu.getPeer((int)index).peer.follower == null) {
            ++index;
        }
        LOG.info("Connecting to follower: {}", (Object)index);
        TestableZooKeeper zk = FollowerResyncConcurrencyTest.createTestableClient("localhost:" + qu.getPeer((int)index).peer.getClientPort());
        Assert.assertEquals((long)0L, (long)zk.testableLastZxid());
        zk.exists("/", false);
        long lzxid = zk.testableLastZxid();
        Assert.assertTrue((String)("lzxid:" + lzxid + " > 0"), (lzxid > 0L ? 1 : 0) != 0);
        zk.close();
        qu.shutdownAll();
    }

    @Test
    public void testFollowerWatcherResync() throws Exception {
        QuorumUtil qu = new QuorumUtil(1);
        qu.startAll();
        int index = 1;
        while (qu.getPeer((int)index).peer.follower == null) {
            ++index;
        }
        LOG.info("Connecting to follower: {}", (Object)index);
        TestableZooKeeper zk1 = FollowerResyncConcurrencyTest.createTestableClient("localhost:" + qu.getPeer((int)index).peer.getClientPort());
        zk1.create("/foo", "foo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher watcher = new MyWatcher();
        TestableZooKeeper zk2 = FollowerResyncConcurrencyTest.createTestableClient(watcher, "localhost:" + qu.getPeer((int)index).peer.getClientPort());
        zk2.exists("/foo", true);
        watcher.reset();
        zk2.testableConnloss();
        if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
            Assert.fail((String)"Unable to connect to server");
        }
        Assert.assertArrayEquals((byte[])"foo".getBytes(), (byte[])zk2.getData("/foo", false, null));
        Assert.assertNull((Object)watcher.events.poll(5L, TimeUnit.SECONDS));
        zk1.close();
        zk2.close();
        qu.shutdownAll();
    }

    private class MyWatcher
    extends ClientBase.CountdownWatcher {
        LinkedBlockingQueue<WatchedEvent> events = new LinkedBlockingQueue();

        private MyWatcher() {
        }

        @Override
        public void process(WatchedEvent event) {
            super.process(event);
            if (event.getType() != Watcher.Event.EventType.None) {
                try {
                    this.events.put(event);
                }
                catch (InterruptedException e) {
                    LOG.warn("ignoring interrupt during event.put");
                }
            }
        }
    }
}

