/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum;

import io.hops.hadoop.shaded.org.apache.jute.InputArchive;
import io.hops.hadoop.shaded.org.apache.jute.OutputArchive;
import io.hops.hadoop.shaded.org.apache.zookeeper.MockPacket;
import io.hops.hadoop.shaded.org.apache.zookeeper.ZKParameterized;
import io.hops.hadoop.shaded.org.apache.zookeeper.proto.ConnectRequest;
import io.hops.hadoop.shaded.org.apache.zookeeper.proto.ReplyHeader;
import io.hops.hadoop.shaded.org.apache.zookeeper.proto.RequestHeader;
import io.hops.hadoop.shaded.org.apache.zookeeper.proto.SetWatches;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.MockNIOServerCnxn;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.MockSelectorThread;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.NIOServerCnxn;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.NIOServerCnxnFactory;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.ZKDatabase;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.Follower;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.LearnerZooKeeperServer;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumPacket;
import io.hops.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumPeer;
import io.hops.hadoop.shaded.org.mockito.ArgumentMatchers;
import io.hops.hadoop.shaded.org.mockito.Mockito;
import io.hops.hadoop.shaded.org.mockito.invocation.InvocationOnMock;
import io.hops.hadoop.shaded.org.mockito.stubbing.Answer;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
@Parameterized.UseParametersRunnerFactory(value=ZKParameterized.RunnerFactory.class)
public class WatchLeakTest {
    protected static final Logger LOG = LoggerFactory.getLogger(WatchLeakTest.class);
    final long SESSION_ID = 47806L;
    private final boolean sessionTimedout;
    private static final long superSecret = 3007405056L;

    @Before
    public void setUp() {
        System.setProperty("zookeeper.admin.enableServer", "false");
    }

    public WatchLeakTest(boolean sessionTimedout) {
        this.sessionTimedout = sessionTimedout;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> configs() {
        return Arrays.asList({false}, {true});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWatchesLeak() throws Exception {
        NIOServerCnxnFactory serverCnxnFactory = Mockito.mock(NIOServerCnxnFactory.class);
        FakeSK sk = new FakeSK();
        MockSelectorThread selectorThread = Mockito.mock(MockSelectorThread.class);
        Mockito.when(selectorThread.addInterestOpsUpdateRequest((SelectionKey)ArgumentMatchers.any(SelectionKey.class))).thenAnswer(new Answer<Boolean>(){

            @Override
            public Boolean answer(InvocationOnMock invocation) throws Throwable {
                SelectionKey sk = (SelectionKey)invocation.getArguments()[0];
                NIOServerCnxn nioSrvCnx = (NIOServerCnxn)sk.attachment();
                sk.interestOps(nioSrvCnx.getInterestOps());
                return true;
            }
        });
        ZKDatabase database = new ZKDatabase(null);
        database.setlastProcessedZxid(2L);
        QuorumPeer quorumPeer = Mockito.mock(QuorumPeer.class);
        FileTxnSnapLog logfactory = Mockito.mock(FileTxnSnapLog.class);
        Mockito.when(logfactory.getDataDir()).thenReturn(new File(""));
        Mockito.when(logfactory.getSnapDir()).thenReturn(new File(""));
        LearnerZooKeeperServer fzks = null;
        try {
            fzks = new FollowerZooKeeperServer(logfactory, quorumPeer, database);
            fzks.startup();
            fzks.setServerCnxnFactory(serverCnxnFactory);
            quorumPeer.follower = new MyFollower(quorumPeer, (FollowerZooKeeperServer)fzks);
            LOG.info("Follower created");
            SocketChannel socketChannel = this.createClientSocketChannel();
            MockNIOServerCnxn nioCnxn = new MockNIOServerCnxn(fzks, socketChannel, sk, serverCnxnFactory, selectorThread);
            sk.attach(nioCnxn);
            nioCnxn.doIO(sk);
            LOG.info("Client connection sent");
            QuorumPacket qp = this.createValidateSessionPacketResponse(!this.sessionTimedout);
            quorumPeer.follower.processPacket(qp);
            LOG.info("Session validation sent");
            nioCnxn.doIO(sk);
            Thread.sleep(1000L);
            LOG.info("Watches processed");
            int watchCount = database.getDataTree().getWatchCount();
            if (this.sessionTimedout) {
                LOG.info("session is not valid, watches = {}", (Object)watchCount);
                Assert.assertEquals((String)"Session is not valid so there should be no watches", (long)0L, (long)watchCount);
            } else {
                LOG.info("session is valid, watches = {}", (Object)watchCount);
                Assert.assertEquals((String)"Session is valid so the watch should be there", (long)1L, (long)watchCount);
            }
        }
        finally {
            if (fzks != null) {
                fzks.shutdown();
            }
        }
    }

    private ByteBuffer createWatchesMessage() {
        ArrayList<String> dataWatches = new ArrayList<String>(1);
        dataWatches.add("/");
        List<String> existWatches = Collections.emptyList();
        List<String> childWatches = Collections.emptyList();
        SetWatches sw = new SetWatches(1L, dataWatches, existWatches, childWatches);
        RequestHeader h = new RequestHeader();
        h.setType(101);
        h.setXid(-8);
        MockPacket p = new MockPacket(h, new ReplyHeader(), sw, null, null);
        return p.createAndReturnBB();
    }

    private ByteBuffer createConnRequest() {
        Random r = new Random(3007440574L);
        byte[] p = new byte[16];
        r.nextBytes(p);
        ConnectRequest conReq = new ConnectRequest(0, 1L, 30000, 47806L, p);
        MockPacket packet = new MockPacket(null, null, conReq, null, null, false);
        return packet.createAndReturnBB();
    }

    private SocketChannel createClientSocketChannel() throws IOException {
        SocketChannel socketChannel = Mockito.mock(SocketChannel.class);
        Socket socket = Mockito.mock(Socket.class);
        InetSocketAddress socketAddress = new InetSocketAddress(1234);
        Mockito.when(socket.getRemoteSocketAddress()).thenReturn(socketAddress);
        Mockito.when(socketChannel.socket()).thenReturn(socket);
        ByteBuffer connRequest = this.createConnRequest();
        ByteBuffer watchesMessage = this.createWatchesMessage();
        final ByteBuffer request = ByteBuffer.allocate(connRequest.limit() + watchesMessage.limit());
        request.put(connRequest);
        request.put(watchesMessage);
        Answer<Integer> answer = new Answer<Integer>(){
            int i = 0;

            @Override
            public Integer answer(InvocationOnMock invocation) throws Throwable {
                Object[] args = invocation.getArguments();
                ByteBuffer bb = (ByteBuffer)args[0];
                for (int k = 0; k < bb.limit(); ++k) {
                    bb.put(request.get(this.i));
                    ++this.i;
                }
                return bb.limit();
            }
        };
        Mockito.when(socketChannel.read((ByteBuffer)ArgumentMatchers.any(ByteBuffer.class))).thenAnswer(answer);
        return socketChannel;
    }

    private QuorumPacket createValidateSessionPacketResponse(boolean valid) throws Exception {
        QuorumPacket qp = this.createValidateSessionPacket();
        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
        DataInputStream dis = new DataInputStream(bis);
        long id = dis.readLong();
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bos);
        dos.writeLong(id);
        dos.writeBoolean(valid);
        qp.setData(bos.toByteArray());
        return qp;
    }

    private QuorumPacket createValidateSessionPacket() throws Exception {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        dos.writeLong(47806L);
        dos.writeInt(3000);
        dos.close();
        QuorumPacket qp = new QuorumPacket(6, -1L, baos.toByteArray(), null);
        return qp;
    }

    private static class FakeSK
    extends SelectionKey {
        private int ops = 5;

        private FakeSK() {
        }

        @Override
        public SelectableChannel channel() {
            return null;
        }

        @Override
        public Selector selector() {
            return Mockito.mock(Selector.class);
        }

        @Override
        public boolean isValid() {
            return true;
        }

        @Override
        public void cancel() {
        }

        @Override
        public int interestOps() {
            return this.ops;
        }

        @Override
        public SelectionKey interestOps(int ops) {
            this.ops = ops;
            return this;
        }

        @Override
        public int readyOps() {
            boolean writing;
            boolean reading = (this.ops & 1) != 0;
            boolean bl = writing = (this.ops & 4) != 0;
            if (reading && writing) {
                LOG.info("Channel is ready for reading and writing");
            } else if (reading) {
                LOG.info("Channel is ready for reading only");
            } else if (writing) {
                LOG.info("Channel is ready for writing only");
            }
            return this.ops;
        }
    }

    public static class MyFollower
    extends Follower {
        MyFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
            super(self, zk);
            this.leaderOs = Mockito.mock(OutputArchive.class);
            this.leaderIs = Mockito.mock(InputArchive.class);
            this.bufferedOutput = Mockito.mock(BufferedOutputStream.class);
        }
    }
}

