/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum;

import io.hops.hadoop.shaded.org.apache.zookeeper.PortAssignment;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZKTestCase;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.FastLeaderElection;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumCnxManager;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumPeer;
import io.hops.hadoop.shaded.org.apache.zookeeper.test.ClientBase;
import java.io.DataOutputStream;
import java.io.File;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CnxManagerTest
extends ZKTestCase {
    protected static final Logger LOG = LoggerFactory.getLogger(CnxManagerTest.class);
    protected static final int THRESHOLD = 4;
    int count;
    HashMap<Long, QuorumPeer.QuorumServer> peers;
    File[] peerTmpdir;
    int[] peerQuorumPort;
    int[] peerClientPort;

    @Before
    public void setUp() throws Exception {
        this.count = 3;
        this.peers = new HashMap(this.count);
        this.peerTmpdir = new File[this.count];
        this.peerQuorumPort = new int[this.count];
        this.peerClientPort = new int[this.count];
        for (int i = 0; i < this.count; ++i) {
            this.peerQuorumPort[i] = PortAssignment.unique();
            this.peerClientPort[i] = PortAssignment.unique();
            this.peers.put(Long.valueOf(i), new QuorumPeer.QuorumServer(i, "0.0.0.0", this.peerQuorumPort[i], PortAssignment.unique(), null));
            this.peerTmpdir[i] = ClientBase.createTmpDir();
        }
    }

    ByteBuffer createMsg(int state, long leader, long zxid, long epoch) {
        return FastLeaderElection.buildMsg(state, leader, zxid, 0L, epoch);
    }

    @Test
    public void testCnxManager() throws Exception {
        CnxManagerThread thread = new CnxManagerThread();
        thread.start();
        QuorumPeer peer = new QuorumPeer(this.peers, this.peerTmpdir[1], this.peerTmpdir[1], this.peerClientPort[1], 3, 1L, 1000, 2, 2);
        QuorumCnxManager cnxManager = peer.createCnxnManager();
        QuorumCnxManager.Listener listener = cnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        cnxManager.toSend(new Long(0L), this.createMsg(QuorumPeer.ServerState.LOOKING.ordinal(), 1L, -1L, 1L));
        QuorumCnxManager.Message m = null;
        int numRetries = 1;
        while (m == null && numRetries++ <= 4) {
            m = cnxManager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS);
            if (m != null) continue;
            cnxManager.connectAll();
        }
        Assert.assertTrue((String)"Exceeded number of retries", (numRetries <= 4 ? 1 : 0) != 0);
        thread.join(5000L);
        if (thread.isAlive()) {
            Assert.fail((String)"Thread didn't join");
        } else if (thread.failed) {
            Assert.fail((String)"Did not receive expected message");
        }
    }

    @Test
    public void testCnxManagerTimeout() throws Exception {
        Random rand = new Random();
        byte b = (byte)rand.nextInt();
        int finalOctet = b & 0xFF;
        int deadPort = PortAssignment.unique();
        String deadAddress = new String("192.0.2." + finalOctet);
        LOG.info("This is the dead address I'm trying: " + deadAddress);
        this.peers.put(2L, new QuorumPeer.QuorumServer(2L, deadAddress, deadPort, PortAssignment.unique(), null));
        this.peerTmpdir[2] = ClientBase.createTmpDir();
        QuorumPeer peer = new QuorumPeer(this.peers, this.peerTmpdir[1], this.peerTmpdir[1], this.peerClientPort[1], 3, 1L, 1000, 2, 2);
        QuorumCnxManager cnxManager = peer.createCnxnManager();
        QuorumCnxManager.Listener listener = cnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        long begin = System.currentTimeMillis();
        cnxManager.toSend(new Long(2L), this.createMsg(QuorumPeer.ServerState.LOOKING.ordinal(), 1L, -1L, 1L));
        long end = System.currentTimeMillis();
        if (end - begin > 6000L) {
            Assert.fail((String)"Waited more than necessary");
        }
    }

    @Test
    public void testCnxManagerSpinLock() throws Exception {
        QuorumPeer peer = new QuorumPeer(this.peers, this.peerTmpdir[1], this.peerTmpdir[1], this.peerClientPort[1], 3, 1L, 1000, 2, 2);
        QuorumCnxManager cnxManager = peer.createCnxnManager();
        QuorumCnxManager.Listener listener = cnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        int port = this.peers.get((Object)Long.valueOf((long)peer.getId())).electionAddr.getPort();
        LOG.info("Election port: " + port);
        InetSocketAddress addr = new InetSocketAddress(port);
        Thread.sleep(1000L);
        SocketChannel sc = SocketChannel.open();
        sc.socket().connect(this.peers.get((Object)new Long((long)1L)).electionAddr, 5000);
        byte[] msgBytes = new byte[8];
        ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
        msgBuffer.putLong(new Long(2L));
        msgBuffer.position(0);
        sc.write(msgBuffer);
        msgBuffer = ByteBuffer.wrap(new byte[4]);
        msgBuffer.putInt(-20);
        msgBuffer.position(0);
        sc.write(msgBuffer);
        Thread.sleep(1000L);
        try {
            for (int i = 0; i < 100; ++i) {
                msgBuffer.position(0);
                sc.write(msgBuffer);
            }
            Assert.fail((String)"Socket has not been closed");
        }
        catch (Exception e) {
            LOG.info("Socket has been closed as expected");
        }
        peer.shutdown();
        cnxManager.halt();
    }

    @Test
    public void testCnxFromFutureVersion() throws Exception {
        QuorumPeer peer = new QuorumPeer(this.peers, this.peerTmpdir[1], this.peerTmpdir[1], this.peerClientPort[1], 3, 1L, 1000, 2, 20);
        TestCnxManager cnxManager = new TestCnxManager(peer);
        QuorumCnxManager.Listener listener = cnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            Assert.fail((String)"Null listener when initializing cnx manager");
        }
        int port = this.peers.get((Object)Long.valueOf((long)peer.getId())).electionAddr.getPort();
        LOG.info("Election port: " + port);
        Thread.sleep(1000L);
        SocketChannel sc = SocketChannel.open();
        sc.socket().connect(this.peers.get((Object)new Long((long)1L)).electionAddr, 5000);
        InetSocketAddress otherAddr = this.peers.get((Object)new Long((long)2L)).electionAddr;
        DataOutputStream dout = new DataOutputStream(sc.socket().getOutputStream());
        dout.writeLong(-65536L);
        dout.writeLong(new Long(2L));
        String addr = otherAddr.getHostName() + ":" + otherAddr.getPort();
        byte[] addr_bytes = addr.getBytes();
        dout.writeInt(addr_bytes.length);
        dout.write(addr_bytes);
        dout.flush();
        Thread.sleep(1000L);
        Assert.assertEquals((String)"Server 1 got connection request from server 2", (Object)true, (Object)cnxManager.senderWorkerMapContains(new Long(2L)));
        String testStr = "this is a test message string";
        byte[] testStr_bytes = testStr.getBytes();
        dout.writeInt(testStr_bytes.length);
        dout.write(testStr_bytes);
        dout.flush();
        QuorumCnxManager.Message m = null;
        int numRetries = 1;
        while (m == null && numRetries++ <= 4) {
            m = cnxManager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS);
            if (m != null) continue;
            cnxManager.connectAll();
        }
        if (numRetries > 4) {
            Assert.fail((String)"Test message hasn't been found in recvQueue");
        }
        Assert.assertEquals((String)"Message sender should be 2", (long)2L, (long)cnxManager.getSid(m));
        Assert.assertEquals((String)"Message from 2 doesn't match test sring", (Object)testStr, (Object)cnxManager.getMsgString(m));
        peer.shutdown();
        cnxManager.halt();
    }

    @Test
    public void testSocketTimeout() throws Exception {
        QuorumPeer peer = new QuorumPeer(this.peers, this.peerTmpdir[1], this.peerTmpdir[1], this.peerClientPort[1], 3, 1L, 2000, 2, 2);
        QuorumCnxManager cnxManager = peer.createCnxnManager();
        QuorumCnxManager.Listener listener = cnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        int port = this.peers.get((Object)Long.valueOf((long)peer.getId())).electionAddr.getPort();
        LOG.info("Election port: " + port);
        InetSocketAddress addr = new InetSocketAddress(port);
        Thread.sleep(1000L);
        Socket sock = new Socket();
        sock.connect(this.peers.get((Object)new Long((long)1L)).electionAddr, 5000);
        long begin = System.currentTimeMillis();
        cnxManager.receiveConnection(sock);
        long end = System.currentTimeMillis();
        if (end - begin > (long)(peer.getSyncLimit() * peer.getTickTime() + 500)) {
            Assert.fail((String)"Waited more than necessary");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWorkerThreads() throws Exception {
        ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>();
        try {
            for (int sid = 0; sid < 3; ++sid) {
                QuorumPeer peer = new QuorumPeer(this.peers, this.peerTmpdir[sid], this.peerTmpdir[sid], this.peerClientPort[sid], 3, (long)sid, 1000, 2, 2);
                LOG.info("Starting peer {}", (Object)peer.getId());
                peer.start();
                peerList.add(sid, peer);
            }
            String failure = this.verifyThreadCount(peerList, 4L);
            if (failure != null) {
                Assert.fail((String)failure);
            }
            for (int myid = 0; myid < 3; ++myid) {
                for (int i = 0; i < 5; ++i) {
                    QuorumPeer peer = peerList.get(myid);
                    LOG.info("Round {}, halting peer {}", new Object[]{i, peer.getId()});
                    peer.shutdown();
                    peerList.remove(myid);
                    failure = this.verifyThreadCount(peerList, 2L);
                    Assert.assertNull((String)failure, (Object)failure);
                    peer = new QuorumPeer(this.peers, this.peerTmpdir[myid], this.peerTmpdir[myid], this.peerClientPort[myid], 3, (long)myid, 1000, 2, 2);
                    LOG.info("Round {}, restarting peer {}" + new Object[]{i, peer.getId()});
                    peer.start();
                    peerList.add(myid, peer);
                    failure = this.verifyThreadCount(peerList, 4L);
                    Assert.assertNull((String)failure, (Object)failure);
                }
            }
        }
        catch (Throwable throwable) {
            for (QuorumPeer quorumPeer : peerList) {
                quorumPeer.shutdown();
            }
            throw throwable;
        }
        for (QuorumPeer quorumPeer : peerList) {
            quorumPeer.shutdown();
        }
    }

    public String verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt) throws InterruptedException {
        String failure = null;
        for (int i = 0; i < 480; ++i) {
            Thread.sleep(500L);
            failure = this._verifyThreadCount(peerList, ecnt);
            if (failure != null) continue;
            return null;
        }
        return failure;
    }

    public String _verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt) {
        for (int myid = 0; myid < peerList.size(); ++myid) {
            QuorumPeer peer = peerList.get(myid);
            QuorumCnxManager cnxManager = peer.getQuorumCnxManager();
            long cnt = cnxManager.getThreadCount();
            if (cnt == ecnt) continue;
            return new String(new Date() + " Incorrect number of Worker threads for sid=" + myid + " expected " + ecnt + " found " + cnt);
        }
        return null;
    }

    class TestCnxManager
    extends QuorumCnxManager {
        TestCnxManager(QuorumPeer self) {
            super(self.getId(), self.getView(), self.authServer, self.authLearner, self.tickTime * self.syncLimit, self.getQuorumListenOnAllIPs(), self.quorumCnxnThreadsSize, false);
        }

        boolean senderWorkerMapContains(Long l) {
            return this.senderWorkerMap.containsKey(l);
        }

        long getSid(QuorumCnxManager.Message m) {
            return m.sid;
        }

        String getMsgString(QuorumCnxManager.Message m) {
            return new String(m.buffer.array());
        }
    }

    class CnxManagerThread
    extends Thread {
        boolean failed = false;

        CnxManagerThread() {
        }

        @Override
        public void run() {
            try {
                QuorumPeer peer = new QuorumPeer(CnxManagerTest.this.peers, CnxManagerTest.this.peerTmpdir[0], CnxManagerTest.this.peerTmpdir[0], CnxManagerTest.this.peerClientPort[0], 3, 0L, 1000, 2, 2);
                QuorumCnxManager cnxManager = peer.createCnxnManager();
                QuorumCnxManager.Listener listener = cnxManager.listener;
                if (listener != null) {
                    listener.start();
                } else {
                    LOG.error("Null listener when initializing cnx manager");
                }
                long sid = 1L;
                cnxManager.toSend(sid, CnxManagerTest.this.createMsg(QuorumPeer.ServerState.LOOKING.ordinal(), 0L, -1L, 1L));
                QuorumCnxManager.Message m = null;
                int numRetries = 1;
                while (m == null && numRetries++ <= 4) {
                    m = cnxManager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS);
                    if (m != null) continue;
                    cnxManager.connectAll();
                }
                if (numRetries > 4) {
                    this.failed = true;
                    return;
                }
                cnxManager.testInitiateConnection(sid);
                m = cnxManager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS);
                if (m == null) {
                    this.failed = true;
                    return;
                }
            }
            catch (Exception e) {
                LOG.error("Exception while running mock thread", (Throwable)e);
                Assert.fail((String)"Unexpected exception");
            }
        }
    }
}

