/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import com.google.common.base.Supplier;
import io.hops.common.INodeUtil;
import io.hops.exception.StorageException;
import io.hops.metadata.hdfs.entity.INodeIdentifier;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.HopsTransactionalRequestHandler;
import io.hops.transaction.lock.LockFactory;
import io.hops.transaction.lock.TransactionLockTypes;
import io.hops.transaction.lock.TransactionLocks;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.HAStressTestHarness;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

public class TestPipelinesFailover {
    protected static final Log LOG;
    private static final Path TEST_PATH;
    private static final int BLOCK_SIZE = 4096;
    private static final int BLOCK_AND_A_HALF = 6144;
    private static final int STRESS_NUM_THREADS = 25;
    private static final int STRESS_RUNTIME = 40000;

    @Test(timeout=30000L)
    public void testAllocateBlockAfterCrashFailover() throws Exception {
        this.doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED, MethodToTestIdempotence.ALLOCATE_BLOCK);
    }

    @Test(timeout=30000L)
    public void testCompleteFileAfterCrashFailover() throws Exception {
        this.doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED, MethodToTestIdempotence.COMPLETE_FILE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doWriteOverFailoverTest(TestScenario scenario, MethodToTestIdempotence methodToTest) throws Exception {
        Configuration conf = new Configuration();
        conf.setInt("dfs.blocksize", 4096);
        conf.setInt("dfs.namenode.replication.interval", 1000);
        FSDataOutputStream stm = null;
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHOPSTopology(2)).numDataNodes(3).build();
        try {
            int sizeWritten = 0;
            cluster.waitActive();
            Thread.sleep(500L);
            LOG.info((Object)"Starting with NN 0 active");
            DistributedFileSystem fs = cluster.getFileSystem(0);
            stm = fs.create(TEST_PATH);
            AppendTestUtil.write((OutputStream)stm, 0, 6144);
            sizeWritten += 6144;
            stm.hflush();
            LOG.info((Object)"Failing over to NN 1");
            scenario.run(cluster);
            FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem();
            BlockManagerTestUtil.updateState(ns1.getBlockManager());
            Assert.assertEquals((long)0L, (long)ns1.getPendingReplicationBlocks());
            Assert.assertEquals((long)0L, (long)ns1.getCorruptReplicaBlocks());
            Assert.assertEquals((long)0L, (long)ns1.getMissingBlocksCount());
            if (methodToTest == MethodToTestIdempotence.ALLOCATE_BLOCK) {
                AppendTestUtil.write((OutputStream)stm, sizeWritten, 6144);
                sizeWritten += 6144;
            }
            stm.close();
            stm = null;
            AppendTestUtil.check((FileSystem)fs, TEST_PATH, sizeWritten);
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(stm);
            cluster.shutdown();
            throw throwable;
        }
        IOUtils.closeStream((Closeable)stm);
        cluster.shutdown();
    }

    @Test(timeout=30000L)
    public void testWriteOverCrashFailoverWithDnFail() throws Exception {
        this.doTestWriteOverFailoverWithDnFail(TestScenario.ORIGINAL_ACTIVE_CRASHED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doTestWriteOverFailoverWithDnFail(TestScenario scenario) throws Exception {
        Configuration conf = new Configuration();
        conf.setInt("dfs.blocksize", 4096);
        FSDataOutputStream stm = null;
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHOPSTopology(2)).numDataNodes(5).build();
        try {
            cluster.waitActive();
            Thread.sleep(500L);
            LOG.info((Object)"Starting with NN 0 active");
            DistributedFileSystem fs = cluster.getFileSystem(0);
            stm = fs.create(TEST_PATH);
            AppendTestUtil.write((OutputStream)stm, 0, 6144);
            stm.hflush();
            LOG.info((Object)"Failing over to NN 1");
            scenario.run(cluster);
            Assert.assertTrue((boolean)fs.exists(TEST_PATH));
            cluster.stopDataNode(0);
            AppendTestUtil.write((OutputStream)stm, 6144, 6144);
            stm.hflush();
            cluster.stopDataNode(1);
            AppendTestUtil.write((OutputStream)stm, 12288, 6144);
            stm.hflush();
            stm.close();
            stm = null;
            AppendTestUtil.check((FileSystem)fs, TEST_PATH, 18432L);
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(stm);
            cluster.shutdown();
            throw throwable;
        }
        IOUtils.closeStream((Closeable)stm);
        cluster.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=150000L)
    public void testLeaseRecoveryAfterFailover() throws Exception {
        Configuration conf = new Configuration();
        conf.setBoolean("dfs.permissions.enabled", false);
        conf.setInt("dfs.blocksize", 4096);
        FSDataOutputStream stm = null;
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHOPSTopology(2)).numDataNodes(3).build();
        try {
            cluster.waitActive();
            Thread.sleep(500L);
            LOG.info((Object)"Starting with NN 0 active");
            DistributedFileSystem fs = cluster.getFileSystem(0);
            stm = fs.create(TEST_PATH);
            AppendTestUtil.write((OutputStream)stm, 0, 6144);
            stm.hflush();
            LOG.info((Object)"Failing over to NN 1");
            cluster.restartNameNode(0);
            cluster.waitActive();
            Assert.assertTrue((boolean)fs.exists(TEST_PATH));
            DistributedFileSystem fsOtherUser = this.createFsAsOtherUser(cluster, conf);
            TestPipelinesFailover.loopRecoverLease((FileSystem)fsOtherUser, TEST_PATH);
            AppendTestUtil.check((FileSystem)fs, TEST_PATH, 6144L);
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(stm);
            cluster.shutdown();
            throw throwable;
        }
        IOUtils.closeStream((Closeable)stm);
        cluster.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=120000L)
    @Ignore
    public void testPipelineRecoveryStress() throws Exception {
        HAStressTestHarness harness = new HAStressTestHarness();
        harness.conf.setBoolean("dfs.permissions.enabled", false);
        harness.conf.setInt("dfs.client.failover.sleep.max.millis", 1000);
        MiniDFSCluster cluster = harness.startCluster();
        try {
            cluster.waitActive();
            FileSystem fs = harness.getFailoverFs();
            DistributedFileSystem fsAsOtherUser = this.createFsAsOtherUser(cluster, harness.conf);
            MultithreadedTestUtil.TestContext testers = new MultithreadedTestUtil.TestContext();
            for (int i = 0; i < 25; ++i) {
                Path p = new Path("/test-" + i);
                testers.addThread((MultithreadedTestUtil.TestingThread)new PipelineTestThread(testers, fs, (FileSystem)fsAsOtherUser, p));
            }
            harness.addReplicationTriggerThread(500);
            harness.addFailoverThread(5000);
            harness.startThreads();
            testers.startThreads();
            testers.waitFor(40000L);
            testers.stop();
            harness.stopThreads();
        }
        finally {
            System.err.println("===========================\n\n\n\n");
            harness.shutdown();
        }
    }

    private DistributedFileSystem createFsAsOtherUser(final MiniDFSCluster cluster, Configuration conf) throws IOException, InterruptedException {
        return (DistributedFileSystem)UserGroupInformation.createUserForTesting((String)"otheruser", (String[])new String[]{"othergroup"}).doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<FileSystem>(){

            @Override
            public FileSystem run() throws Exception {
                return cluster.getFileSystem(0);
            }
        });
    }

    private DatanodeStorageInfo[] getStorageInfosTx(final BlockInfoUnderConstruction b, final DatanodeManager datanodeManager) throws IOException {
        return (DatanodeStorageInfo[])new HopsTransactionalRequestHandler(HDFSOperationType.GET_EXPECTED_BLK_LOCATIONS){
            INodeIdentifier inodeIdentifier;

            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock((Block)b);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.READ, this.inodeIdentifier)).add(lf.getIndividualBlockLock(b.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(new LockFactory.BLK[]{LockFactory.BLK.RE, LockFactory.BLK.UC}));
            }

            public Object performTask() throws StorageException, IOException {
                return b.getExpectedStorageLocations(datanodeManager);
            }
        }.handle();
    }

    private static void loopRecoverLease(final FileSystem fsOtherUser, final Path testPath) throws TimeoutException, InterruptedException, IOException {
        try {
            GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                public Boolean get() {
                    boolean success;
                    try {
                        success = ((DistributedFileSystem)fsOtherUser).recoverLease(testPath);
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    if (!success) {
                        LOG.info((Object)"Waiting to recover lease successfully");
                    }
                    return success;
                }
            }, (int)5000, (int)600000);
        }
        catch (TimeoutException e) {
            throw new TimeoutException("Timed out recovering lease for " + testPath);
        }
    }

    static {
        ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)LogFactory.getLog(BlockManager.class)).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)LogFactory.getLog((String)"org.apache.hadoop.io.retry.RetryInvocationHandler")).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
        LOG = LogFactory.getLog(TestPipelinesFailover.class);
        TEST_PATH = new Path("/test-file");
    }

    private static class PipelineTestThread
    extends MultithreadedTestUtil.RepeatingTestThread {
        private final FileSystem fs;
        private final FileSystem fsOtherUser;
        private final Path path;

        public PipelineTestThread(MultithreadedTestUtil.TestContext ctx, FileSystem fs, FileSystem fsOtherUser, Path p) {
            super(ctx);
            this.fs = fs;
            this.fsOtherUser = fsOtherUser;
            this.path = p;
        }

        public void doAnAction() throws Exception {
            FSDataOutputStream stm = this.fs.create(this.path, true);
            try {
                AppendTestUtil.write((OutputStream)stm, 0, 100);
                stm.hflush();
                TestPipelinesFailover.loopRecoverLease(this.fsOtherUser, this.path);
                AppendTestUtil.check(this.fs, this.path, 100L);
            }
            finally {
                try {
                    stm.close();
                }
                catch (IOException iOException) {}
            }
        }

        public String toString() {
            return "Pipeline test thread for " + this.path;
        }
    }

    static enum MethodToTestIdempotence {
        ALLOCATE_BLOCK,
        COMPLETE_FILE;

    }

    static enum TestScenario {
        ORIGINAL_ACTIVE_CRASHED{

            @Override
            void run(MiniDFSCluster cluster) throws IOException {
                cluster.restartNameNode(0);
            }
        };


        abstract void run(MiniDFSCluster var1) throws IOException;
    }
}

