/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum;

import io.hops.hadoop.shaded.org.apache.zookeeper.PortAssignment;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZKTestCase;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.FastLeaderElection;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumCnxManager;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumPeer;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.Vote;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.util.ZxidUtils;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.ClientBase;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FLECompatibilityTest
extends ZKTestCase {
    protected static final Logger LOG = LoggerFactory.getLogger(FLECompatibilityTest.class);
    int count;
    HashMap<Long, QuorumPeer.QuorumServer> peers;
    File[] tmpdir;
    int[] port;

    @Before
    public void setUp() throws Exception {
        this.count = 3;
        this.peers = new HashMap(this.count);
        this.tmpdir = new File[this.count];
        this.port = new int[this.count];
    }

    @After
    public void tearDown() throws Exception {
    }

    void populate() throws Exception {
        for (int i = 0; i < this.count; ++i) {
            this.peers.put(Long.valueOf(i), new QuorumPeer.QuorumServer(i, "0.0.0.0", PortAssignment.unique(), PortAssignment.unique(), null));
            this.tmpdir[i] = ClientBase.createTmpDir();
            this.port[i] = PortAssignment.unique();
        }
    }

    @Test(timeout=20000L)
    public void testBackwardCompatibility() throws Exception {
        this.populate();
        QuorumPeer peer = new QuorumPeer(this.peers, this.tmpdir[0], this.tmpdir[0], this.port[0], 3, 0L, 1000, 2, 2);
        peer.setPeerState(QuorumPeer.ServerState.LOOKING);
        QuorumCnxManager mng = peer.createCnxnManager();
        MockFLEMessengerBackward fle = new MockFLEMessengerBackward(peer, mng);
        ByteBuffer buffer = FastLeaderElection.buildMsg(QuorumPeer.ServerState.LOOKING.ordinal(), 2L, 1L, 1L, 1L);
        fle.manager.recvQueue.add(new QuorumCnxManager.Message(buffer, 2L));
        FastLeaderElection.Notification n = fle.recvqueue.take();
        Assert.assertTrue((String)"Wrong state", (n.state == QuorumPeer.ServerState.LOOKING ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong leader", (n.leader == 2L ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong zxid", (n.zxid == 1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong epoch", (n.electionEpoch == 1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong epoch", (n.peerEpoch == 1L ? 1 : 0) != 0);
        peer.setPeerState(QuorumPeer.ServerState.FOLLOWING);
        peer.setCurrentVote(new Vote(2L, 1L, 1L, 1L, QuorumPeer.ServerState.LOOKING));
        buffer = FastLeaderElection.buildMsg(QuorumPeer.ServerState.LOOKING.ordinal(), 1L, 1L, 1L, 1L);
        fle.manager.recvQueue.add(new QuorumCnxManager.Message(buffer, 1L));
        FastLeaderElection.ToSend m = fle.internalqueue.take();
        Assert.assertTrue((String)"Wrong state", (m.state == QuorumPeer.ServerState.FOLLOWING ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong sid", (m.sid == 1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong leader", (m.leader == 2L ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong epoch", (m.electionEpoch == 1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong epoch", (m.peerEpoch == 1L ? 1 : 0) != 0);
    }

    @Test(timeout=20000L)
    public void testForwardCompatibility() throws Exception {
        this.populate();
        QuorumPeer peer = new QuorumPeer(this.peers, this.tmpdir[0], this.tmpdir[0], this.port[0], 3, 0L, 1000, 2, 2);
        peer.setPeerState(QuorumPeer.ServerState.LOOKING);
        QuorumCnxManager mng = peer.createCnxnManager();
        MockFLEMessengerForward fle = new MockFLEMessengerForward(peer, mng);
        ByteBuffer notBuffer = FastLeaderElection.buildMsg(QuorumPeer.ServerState.LOOKING.ordinal(), 2L, 1L, 1L, 1L);
        ByteBuffer buffer = ByteBuffer.allocate(notBuffer.capacity() + 8);
        notBuffer.flip();
        buffer.put(notBuffer);
        buffer.putLong(Long.MAX_VALUE);
        buffer.flip();
        fle.manager.recvQueue.add(new QuorumCnxManager.Message(buffer, 2L));
        FastLeaderElection.Notification n = (FastLeaderElection.Notification)fle.recvqueue.take();
        Assert.assertTrue((String)"Wrong state", (n.state == QuorumPeer.ServerState.LOOKING ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong leader", (n.leader == 2L ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong zxid", (n.zxid == 1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong epoch", (n.electionEpoch == 1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong epoch", (n.peerEpoch == 1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"Wrong version", (n.version == 1 ? 1 : 0) != 0);
    }

    class MockFLEMessengerForward
    extends FastLeaderElection {
        MockFLEMessengerForward(QuorumPeer self, QuorumCnxManager manager) {
            super(self, manager);
        }

        void halt() {
            super.shutdown();
        }
    }

    class MockFLEMessengerBackward {
        QuorumCnxManager manager;
        QuorumPeer self;
        long logicalclock = 1L;
        LinkedBlockingQueue<FastLeaderElection.ToSend> sendqueue = new LinkedBlockingQueue();
        LinkedBlockingQueue<FastLeaderElection.ToSend> internalqueue = new LinkedBlockingQueue();
        LinkedBlockingQueue<FastLeaderElection.Notification> recvqueue = new LinkedBlockingQueue();
        WorkerReceiver wr;

        MockFLEMessengerBackward(QuorumPeer self, QuorumCnxManager manager) {
            this.manager = manager;
            this.self = self;
            this.wr = new WorkerReceiver(manager);
            Thread t = new Thread((Runnable)this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
            t.setDaemon(true);
            t.start();
        }

        void halt() {
            this.wr.stop = true;
        }

        class WorkerReceiver
        implements Runnable {
            volatile boolean stop = false;
            QuorumCnxManager manager;
            final long proposedLeader = 2L;
            final long proposedZxid = 1L;
            final long proposedEpoch = 1L;

            WorkerReceiver(QuorumCnxManager manager) {
                this.manager = manager;
            }

            Vote getVote() {
                return new Vote(2L, 1L, 1L);
            }

            @Override
            public void run() {
                while (!this.stop) {
                    try {
                        FastLeaderElection.ToSend notmsg;
                        QuorumCnxManager.Message response = this.manager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS);
                        if (response == null) continue;
                        if (!MockFLEMessengerBackward.this.self.getVotingView().containsKey(response.sid)) {
                            Vote current = MockFLEMessengerBackward.this.self.getCurrentVote();
                            FastLeaderElection.ToSend notmsg2 = new FastLeaderElection.ToSend(FastLeaderElection.ToSend.mType.notification, current.getId(), current.getZxid(), MockFLEMessengerBackward.this.logicalclock, MockFLEMessengerBackward.this.self.getPeerState(), response.sid, current.getPeerEpoch());
                            MockFLEMessengerBackward.this.internalqueue.offer(notmsg2);
                            continue;
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Receive new notification message. My id = " + MockFLEMessengerBackward.this.self.getId());
                        }
                        if (response.buffer.capacity() < 28) {
                            LOG.error("Got a short response: " + response.buffer.capacity());
                            continue;
                        }
                        boolean backCompatibility = response.buffer.capacity() == 28;
                        response.buffer.clear();
                        QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                        switch (response.buffer.getInt()) {
                            case 0: {
                                ackstate = QuorumPeer.ServerState.LOOKING;
                                break;
                            }
                            case 1: {
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
                                break;
                            }
                            case 2: {
                                ackstate = QuorumPeer.ServerState.LEADING;
                                break;
                            }
                            case 3: {
                                ackstate = QuorumPeer.ServerState.OBSERVING;
                            }
                        }
                        FastLeaderElection.Notification n = new FastLeaderElection.Notification();
                        n.leader = response.buffer.getLong();
                        n.zxid = response.buffer.getLong();
                        n.electionEpoch = response.buffer.getLong();
                        n.state = ackstate;
                        n.sid = response.sid;
                        if (!backCompatibility) {
                            n.peerEpoch = response.buffer.getLong();
                        } else {
                            if (LOG.isInfoEnabled()) {
                                LOG.info("Backward compatibility mode, server id=" + n.sid);
                            }
                            n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
                        }
                        if (MockFLEMessengerBackward.this.self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                            MockFLEMessengerBackward.this.recvqueue.offer(n);
                            if (ackstate != QuorumPeer.ServerState.LOOKING || n.electionEpoch >= MockFLEMessengerBackward.this.logicalclock) continue;
                            Vote v = this.getVote();
                            notmsg = new FastLeaderElection.ToSend(FastLeaderElection.ToSend.mType.notification, v.getId(), v.getZxid(), MockFLEMessengerBackward.this.logicalclock, MockFLEMessengerBackward.this.self.getPeerState(), response.sid, v.getPeerEpoch());
                            MockFLEMessengerBackward.this.internalqueue.offer(notmsg);
                            continue;
                        }
                        Vote current = MockFLEMessengerBackward.this.self.getCurrentVote();
                        if (ackstate != QuorumPeer.ServerState.LOOKING) continue;
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Sending new notification. My id =  " + MockFLEMessengerBackward.this.self.getId() + " recipient=" + response.sid + " zxid=0x" + Long.toHexString(current.getZxid()) + " leader=" + current.getId());
                        }
                        notmsg = new FastLeaderElection.ToSend(FastLeaderElection.ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), MockFLEMessengerBackward.this.self.getPeerState(), response.sid, current.getPeerEpoch());
                        MockFLEMessengerBackward.this.internalqueue.offer(notmsg);
                    }
                    catch (InterruptedException e) {
                        System.out.println("Interrupted Exception while waiting for new message" + e.toString());
                    }
                }
                LOG.info("WorkerReceiver is down");
            }
        }
    }
}

