/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;

public class TestDatanodeBlockScanner {
    private static final Log LOG = LogFactory.getLog(TestDatanodeBlockScanner.class);
    private static final long TIMEOUT = 20000L;
    private static final Pattern pattern = Pattern.compile(".*?(blk_[-]*\\d+).*?scan time\\s*:\\s*(\\d+)");
    private static final Pattern pattern_blockVerify = Pattern.compile(".*?(SCAN_PERIOD)\\s*:\\s*(\\d+.*?)");
    private static final String BASE_PATH;

    private static long waitForVerification(int infoPort, FileSystem fs, Path file, int blocksValidated, long newTime, long timeout) throws IOException, TimeoutException {
        long failtime;
        URL url = new URL("http://localhost:" + infoPort + "/blockScannerReport?listblocks");
        long lastWarnTime = Time.monotonicNow();
        if (newTime <= 0L) {
            newTime = 1L;
        }
        long verificationTime = 0L;
        String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
        long l = failtime = timeout <= 0L ? Long.MAX_VALUE : Time.monotonicNow() + timeout;
        while (verificationTime < newTime) {
            Matcher matcher;
            if (failtime < Time.monotonicNow()) {
                throw new TimeoutException("failed to achieve block verification after " + timeout + " msec. Current verification timestamp = " + verificationTime + ", requested verification time > " + newTime);
            }
            String response = DFSTestUtil.urlGet(url);
            if (blocksValidated >= 0) {
                matcher = pattern_blockVerify.matcher(response);
                while (matcher.find()) {
                    if (!block.equals(matcher.group(1))) continue;
                    Assert.assertEquals((long)1L, (long)blocksValidated);
                    break;
                }
            }
            matcher = pattern.matcher(response);
            while (matcher.find()) {
                if (!block.equals(matcher.group(1))) continue;
                verificationTime = Long.parseLong(matcher.group(2));
                break;
            }
            if (verificationTime >= newTime) continue;
            long now = Time.monotonicNow();
            if (now - lastWarnTime >= 5000L) {
                LOG.info((Object)("Waiting for verification of " + block));
                lastWarnTime = now;
            }
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException interruptedException) {}
        }
        return verificationTime;
    }

    @Test
    public void testDatanodeBlockScanner() throws IOException, TimeoutException {
        long startTime = Time.monotonicNow();
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).build();
        cluster.waitActive();
        DistributedFileSystem fs = cluster.getFileSystem();
        Path file1 = new Path("/tmp/testBlockVerification/file1");
        Path file2 = new Path("/tmp/testBlockVerification/file2");
        DFSTestUtil.createFile((FileSystem)fs, file1, 10L, (short)1, 0L);
        cluster.shutdown();
        cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(1).format(false).build();
        cluster.waitActive();
        DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), (Configuration)conf);
        fs = cluster.getFileSystem();
        DatanodeInfo dn = dfsClient.datanodeReport(HdfsConstants.DatanodeReportType.LIVE)[0];
        Assert.assertTrue((TestDatanodeBlockScanner.waitForVerification(dn.getInfoPort(), (FileSystem)fs, file1, 1, startTime, 20000L) >= startTime ? 1 : 0) != 0);
        DFSTestUtil.createFile((FileSystem)fs, file2, 10L, (short)1, 0L);
        IOUtils.copyBytes((InputStream)fs.open(file2), (OutputStream)new IOUtils.NullOutputStream(), (Configuration)conf, (boolean)true);
        Assert.assertTrue((TestDatanodeBlockScanner.waitForVerification(dn.getInfoPort(), (FileSystem)fs, file2, 2, startTime, 20000L) >= startTime ? 1 : 0) != 0);
        cluster.shutdown();
    }

    public static boolean corruptReplica(ExtendedBlock blk, int replica) throws IOException {
        return MiniDFSCluster.corruptReplica(replica, blk);
    }

    @Test
    public void testBlockCorruptionPolicy() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setLong("dfs.blockreport.intervalMsec", 1000L);
        Random random = new Random();
        DistributedFileSystem fs = null;
        int rand = random.nextInt(3);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(3).build();
        cluster.waitActive();
        fs = cluster.getFileSystem();
        Path file1 = new Path("/tmp/testBlockVerification/file1");
        DFSTestUtil.createFile((FileSystem)fs, file1, 1024L, (short)3, 0L);
        ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, file1);
        DFSTestUtil.waitReplication((FileSystem)fs, file1, (short)3);
        Assert.assertFalse((boolean)DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
        Assert.assertTrue((boolean)MiniDFSCluster.corruptReplica(rand, block));
        cluster.restartDataNode(rand);
        DFSTestUtil.waitReplication((FileSystem)fs, file1, (short)2);
        Assert.assertFalse((boolean)DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
        Assert.assertTrue((boolean)MiniDFSCluster.corruptReplica(0, block));
        Assert.assertTrue((boolean)MiniDFSCluster.corruptReplica(1, block));
        Assert.assertTrue((boolean)MiniDFSCluster.corruptReplica(2, block));
        for (DataNode dn : cluster.getDataNodes()) {
            DataNodeTestUtils.runBlockScannerForBlock(dn, block);
        }
        DFSTestUtil.waitReplication((FileSystem)fs, file1, (short)3);
        Assert.assertTrue((boolean)DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
        cluster.shutdown();
    }

    @Test
    public void testBlockCorruptionRecoveryPolicy1() throws Exception {
        LOG.info((Object)"Testing corrupt replica recovery for one corrupt replica");
        this.blockCorruptionRecoveryPolicy(4, (short)3, 1);
    }

    @Test
    public void testBlockCorruptionRecoveryPolicy2() throws Exception {
        LOG.info((Object)"Testing corrupt replica recovery for two corrupt replicas");
        this.blockCorruptionRecoveryPolicy(5, (short)3, 2);
    }

    private void blockCorruptionRecoveryPolicy(int numDataNodes, short numReplicas, int numCorruptReplicas) throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setLong("dfs.blockreport.intervalMsec", 30L);
        conf.setLong("dfs.namenode.replication.interval", 3L);
        conf.setLong("dfs.heartbeat.interval", 3L);
        conf.setBoolean("dfs.namenode.replication.considerLoad", false);
        conf.setLong("dfs.namenode.replication.pending.timeout-sec", 5L);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes).build();
        cluster.waitActive();
        DistributedFileSystem fs = cluster.getFileSystem();
        Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
        DFSTestUtil.createFile((FileSystem)fs, file1, 1024L, numReplicas, 0L);
        ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, file1);
        int ITERATIONS = 10;
        DFSTestUtil.waitReplication((FileSystem)fs, file1, numReplicas);
        int k = 0;
        while (true) {
            int i;
            int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
            int j = 0;
            for (i = 0; j != numCorruptReplicas && i < numDataNodes; ++i) {
                if (!TestDatanodeBlockScanner.corruptReplica(block, i)) continue;
                corruptReplicasDNIDs[j++] = i;
                LOG.info((Object)("successfully corrupted block " + block + " on node " + i + " " + cluster.getDataNodes().get(i).getDisplayName()));
            }
            for (i = numCorruptReplicas - 1; i >= 0; --i) {
                LOG.info((Object)("restarting node with corrupt replica: position " + i + " node " + corruptReplicasDNIDs[i] + " " + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName()));
                cluster.restartDataNode(corruptReplicasDNIDs[i]);
            }
            try {
                DFSTestUtil.waitCorruptReplicas((FileSystem)fs, cluster.getNamesystem(), file1, block, numCorruptReplicas);
            }
            catch (TimeoutException e) {
                if (k > 10) {
                    throw e;
                }
                LOG.info((Object)("Timed out waiting for corrupt replicas, trying again, iteration " + k));
                ++k;
                continue;
            }
            break;
        }
        DFSTestUtil.waitReplication((FileSystem)fs, file1, numReplicas);
        Assert.assertFalse((boolean)DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
        DFSTestUtil.waitCorruptReplicas((FileSystem)fs, cluster.getNamesystem(), file1, block, 0);
        cluster.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTruncatedBlockReport() throws Exception {
        ExtendedBlock block;
        DistributedFileSystem fs;
        HdfsConfiguration conf = new HdfsConfiguration();
        int REPLICATION_FACTOR = 2;
        Path fileName = new Path("/file1");
        conf.setLong("dfs.blockreport.intervalMsec", 3L);
        conf.setLong("dfs.namenode.replication.interval", 3L);
        conf.setLong("dfs.heartbeat.interval", 3L);
        conf.setBoolean("dfs.namenode.replication.considerLoad", false);
        long startTime = Time.monotonicNow();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(2).build();
        cluster.waitActive();
        try {
            fs = cluster.getFileSystem();
            DFSTestUtil.createFile((FileSystem)fs, fileName, 1L, (short)2, 0L);
            DFSTestUtil.waitReplication((FileSystem)fs, fileName, (short)2);
            block = DFSTestUtil.getFirstBlock((FileSystem)fs, fileName);
        }
        finally {
            cluster.shutdown();
        }
        cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(2).format(false).build();
        cluster.waitActive();
        try {
            fs = cluster.getFileSystem();
            int infoPort = cluster.getDataNodes().get(0).getInfoPort();
            Assert.assertTrue((TestDatanodeBlockScanner.waitForVerification(infoPort, (FileSystem)fs, fileName, 1, startTime, 20000L) >= startTime ? 1 : 0) != 0);
            if (!TestDatanodeBlockScanner.changeReplicaLength(block, 0, -1)) {
                throw new IOException("failed to find or change length of replica on node 0 " + cluster.getDataNodes().get(0).getDisplayName());
            }
        }
        finally {
            cluster.shutdown();
        }
        cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(2).format(false).build();
        cluster.startDataNodes((Configuration)conf, 1, true, null, null);
        cluster.waitActive();
        cluster.waitClusterUp();
        Assert.assertFalse((String)"failed to leave safe mode", (boolean)cluster.getNameNode().isInSafeMode());
        try {
            DFSTestUtil.waitReplication((FileSystem)cluster.getFileSystem(), fileName, (short)2);
            TestDatanodeBlockScanner.waitForBlockDeleted(block, 0, 20000L);
        }
        finally {
            cluster.shutdown();
        }
    }

    static boolean changeReplicaLength(ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
        File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
        if (blockFile != null && blockFile.exists()) {
            RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
            raFile.setLength(raFile.length() + (long)lenDelta);
            raFile.close();
            return true;
        }
        LOG.info((Object)("failed to change length of block " + blk));
        return false;
    }

    private static void waitForBlockDeleted(ExtendedBlock blk, int dnIndex, long timeout) throws TimeoutException, InterruptedException {
        File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
        long failtime = Time.monotonicNow() + (timeout > 0L ? timeout : Long.MAX_VALUE);
        while (blockFile != null && blockFile.exists()) {
            if (failtime < Time.monotonicNow()) {
                throw new TimeoutException("waited too long for blocks to be deleted: " + blockFile.getPath() + (blockFile.exists() ? " still exists; " : " is absent; "));
            }
            Thread.sleep(100L);
            blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDuplicateScans() throws Exception {
        long startTime = Time.monotonicNow();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1).build();
        DistributedFileSystem fs = null;
        try {
            fs = cluster.getFileSystem();
            DataNode dataNode = cluster.getDataNodes().get(0);
            int infoPort = dataNode.getInfoPort();
            long scanTimeBefore = 0L;
            long scanTimeAfter = 0L;
            for (int i = 1; i < 10; ++i) {
                Path fileName = new Path("/test" + i);
                DFSTestUtil.createFile((FileSystem)fs, fileName, 1024L, (short)1, 1000L);
                TestDatanodeBlockScanner.waitForVerification(infoPort, (FileSystem)fs, fileName, i, startTime, 20000L);
                if (i > 1) {
                    scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode, DFSTestUtil.getFirstBlock((FileSystem)fs, new Path("/test" + (i - 1))));
                    Assert.assertFalse((String)"scan time shoud not be 0", (scanTimeAfter == 0L ? 1 : 0) != 0);
                    Assert.assertEquals((String)"There should not be duplicate scan", (long)scanTimeBefore, (long)scanTimeAfter);
                }
                scanTimeBefore = DataNodeTestUtils.getLatestScanTime(dataNode, DFSTestUtil.getFirstBlock((FileSystem)fs, new Path("/test" + i)));
            }
            cluster.restartDataNode(0);
            Thread.sleep(10000L);
            dataNode = cluster.getDataNodes().get(0);
            scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode, DFSTestUtil.getFirstBlock((FileSystem)fs, new Path("/test9")));
            Assert.assertEquals((String)"There should not be duplicate scan", (long)scanTimeBefore, (long)scanTimeAfter);
        }
        finally {
            IOUtils.closeStream((Closeable)fs);
            cluster.shutdown();
        }
    }

    @Test
    public void testReplicaInfoParsing() throws Exception {
        TestDatanodeBlockScanner.testReplicaInfoParsingSingle(BASE_PATH, new int[0]);
        TestDatanodeBlockScanner.testReplicaInfoParsingSingle(BASE_PATH + "/subdir1", new int[]{1});
        TestDatanodeBlockScanner.testReplicaInfoParsingSingle(BASE_PATH + "/subdir43", new int[]{43});
        TestDatanodeBlockScanner.testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir3", new int[]{1, 2, 3});
        TestDatanodeBlockScanner.testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir43", new int[]{1, 2, 43});
        TestDatanodeBlockScanner.testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir23/subdir3", new int[]{1, 23, 3});
        TestDatanodeBlockScanner.testReplicaInfoParsingSingle(BASE_PATH + "/subdir13/subdir2/subdir3", new int[]{13, 2, 3});
    }

    private static void testReplicaInfoParsingSingle(String subDirPath, int[] expectedSubDirs) {
        File testFile = new File(subDirPath);
        Assert.assertArrayEquals((int[])expectedSubDirs, (int[])ReplicaInfo.parseSubDirs((File)testFile).subDirs);
        Assert.assertEquals((Object)BASE_PATH, (Object)ReplicaInfo.parseSubDirs((File)testFile).baseDirPath);
    }

    static {
        ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN);
        BASE_PATH = new File("/data/current/finalized").getAbsolutePath();
    }
}

