/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import com.google.common.base.Supplier;
import com.google.common.io.ByteStreams;
import java.io.Closeable;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.PeerCache;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestDataTransferKeepalive {
    Configuration conf = new HdfsConfiguration();
    private MiniDFSCluster cluster;
    private DataNode dn;
    private static Path TEST_FILE = new Path("/test");
    private static final int KEEPALIVE_TIMEOUT = 1000;
    private static final int WRITE_TIMEOUT = 3000;

    @Before
    public void setup() throws Exception {
        this.conf.setInt("dfs.datanode.socket.reuse.keepalive", 1000);
        this.conf.setInt("dfs.client.max.block.acquire.failures", 0);
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(1).build();
        this.dn = this.cluster.getDataNodes().get(0);
    }

    @After
    public void teardown() {
        this.cluster.shutdown();
    }

    @Test(timeout=30000L)
    public void testDatanodeRespectsKeepAliveTimeout() throws Exception {
        Configuration clientConf = new Configuration(this.conf);
        long CLIENT_EXPIRY_MS = 60000L;
        clientConf.setLong("dfs.client.socketcache.expiryMsec", 60000L);
        clientConf.set("dfs.client.context", "testDatanodeRespectsKeepAliveTimeout");
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)this.cluster.getURI(), (Configuration)clientConf);
        PeerCache peerCache = ClientContext.getFromConf((Configuration)clientConf).getPeerCache();
        DFSTestUtil.createFile((FileSystem)fs, TEST_FILE, 1L, (short)1, 0L);
        Assert.assertEquals((long)0L, (long)peerCache.size());
        this.assertXceiverCount(0);
        DFSTestUtil.readFile((FileSystem)fs, TEST_FILE);
        Assert.assertEquals((long)1L, (long)peerCache.size());
        this.assertXceiverCount(1);
        Thread.sleep(4050L);
        this.assertXceiverCount(0);
        Assert.assertEquals((long)1L, (long)peerCache.size());
        Peer peer = peerCache.get(this.dn.getDatanodeId(), false);
        Assert.assertNotNull((Object)peer);
        Assert.assertEquals((long)-1L, (long)peer.getInputStream().read());
    }

    @Test(timeout=30000L)
    public void testClientResponsesKeepAliveTimeout() throws Exception {
        Configuration clientConf = new Configuration(this.conf);
        long CLIENT_EXPIRY_MS = 10L;
        clientConf.setLong("dfs.client.socketcache.expiryMsec", 10L);
        clientConf.set("dfs.client.context", "testClientResponsesKeepAliveTimeout");
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)this.cluster.getURI(), (Configuration)clientConf);
        PeerCache peerCache = ClientContext.getFromConf((Configuration)clientConf).getPeerCache();
        DFSTestUtil.createFile((FileSystem)fs, TEST_FILE, 1L, (short)1, 0L);
        Assert.assertEquals((long)0L, (long)peerCache.size());
        this.assertXceiverCount(0);
        DFSTestUtil.readFile((FileSystem)fs, TEST_FILE);
        Assert.assertEquals((long)1L, (long)peerCache.size());
        this.assertXceiverCount(1);
        Thread.sleep(60L);
        Peer peer = peerCache.get(this.dn.getDatanodeId(), false);
        Assert.assertTrue((peer == null ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)peerCache.size());
    }

    @Test(timeout=300000L)
    public void testSlowReader() throws Exception {
        long CLIENT_EXPIRY_MS = 600000L;
        Configuration clientConf = new Configuration(this.conf);
        clientConf.setLong("dfs.client.socketcache.expiryMsec", 600000L);
        clientConf.set("dfs.client.context", "testSlowReader");
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)this.cluster.getURI(), (Configuration)clientConf);
        MiniDFSCluster.DataNodeProperties props = this.cluster.stopDataNode(0);
        props.conf.setInt("dfs.datanode.socket.write.timeout", 3000);
        props.conf.setInt("dfs.datanode.socket.reuse.keepalive", 120000);
        Assert.assertTrue((boolean)this.cluster.restartDataNode(props, true));
        this.dn = this.cluster.getDataNodes().get(0);
        this.cluster.triggerHeartbeats();
        DFSTestUtil.createFile((FileSystem)fs, TEST_FILE, 0x800000L, (short)1, 0L);
        FSDataInputStream stm = fs.open(TEST_FILE);
        stm.read();
        this.assertXceiverCount(1);
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                return TestDataTransferKeepalive.this.getXceiverCountWithoutServer() == 0;
            }
        }, (int)500, (int)50000);
        IOUtils.closeStream((Closeable)stm);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=30000L)
    public void testManyClosedSocketsInCache() throws Exception {
        Configuration clientConf = new Configuration(this.conf);
        clientConf.set("dfs.client.context", "testManyClosedSocketsInCache");
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)this.cluster.getURI(), (Configuration)clientConf);
        PeerCache peerCache = ClientContext.getFromConf((Configuration)clientConf).getPeerCache();
        DFSTestUtil.createFile((FileSystem)fs, TEST_FILE, 1L, (short)1, 0L);
        Closeable[] stms = new InputStream[5];
        try {
            for (int i = 0; i < stms.length; ++i) {
                stms[i] = fs.open(TEST_FILE);
            }
            for (Closeable stm : stms) {
                IOUtils.copyBytes((InputStream)stm, (OutputStream)ByteStreams.nullOutputStream(), (int)1024);
            }
        }
        finally {
            IOUtils.cleanup(null, (Closeable[])stms);
        }
        Assert.assertEquals((long)5L, (long)peerCache.size());
        Thread.sleep(1500L);
        this.assertXceiverCount(0);
        Assert.assertEquals((long)5L, (long)peerCache.size());
        DFSTestUtil.readFile((FileSystem)fs, TEST_FILE);
    }

    private void assertXceiverCount(int expected) throws UnsupportedEncodingException {
        int count = this.dn.getXceiverCount() - 1;
        if (count != expected) {
            try (PrintStream err = new PrintStream((OutputStream)System.err, false, "UTF-8");){
                ReflectionUtils.printThreadInfo((PrintStream)err, (String)"Thread dumps");
            }
            Assert.fail((String)("Expected " + expected + " xceivers, found " + count));
        }
    }

    private int getXceiverCountWithoutServer() {
        return this.dn.getXceiverCount() - 1;
    }
}

