/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import io.hops.common.INodeUtil;
import io.hops.exception.StorageException;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.common.FinderType;
import io.hops.metadata.hdfs.entity.INodeIdentifier;
import io.hops.transaction.EntityManager;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.HopsTransactionalRequestHandler;
import io.hops.transaction.lock.INodeLock;
import io.hops.transaction.lock.Lock;
import io.hops.transaction.lock.LockFactory;
import io.hops.transaction.lock.TransactionLockTypes;
import io.hops.transaction.lock.TransactionLocks;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingReplicationBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.junit.Assert;
import org.junit.Test;

public class TestPendingReplication {
    static final int TIMEOUT = 3;
    private static final int DFS_REPLICATION_INTERVAL = 1;
    private static final int DATANODE_COUNT = 5;

    @Test
    public void testPendingReplication() throws IOException, StorageException {
        BlockInfo block;
        int i;
        HdfsStorageFactory.setConfiguration((Configuration)new HdfsConfiguration());
        HdfsStorageFactory.formatStorage();
        PendingReplicationBlocks pendingReplications = new PendingReplicationBlocks(3000L);
        pendingReplications.start();
        DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
        for (int i2 = 0; i2 < 10; ++i2) {
            BlockInfo block2 = this.newBlockInfo(new Block((long)i2, (long)i2, 0L), i2);
            DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i2];
            System.arraycopy(storages, 0, targets, 0, i2);
            this.increment(pendingReplications, (Block)block2, DatanodeStorageInfo.toDatanodeDescriptors((DatanodeStorageInfo[])targets));
        }
        Assert.assertEquals((String)"Size of pendingReplications ", (long)10L, (long)pendingReplications.size());
        BlockInfo blk = this.newBlockInfo(new Block(8L, 8L, 0L), 8);
        this.decrement(pendingReplications, (Block)blk, storages[7].getDatanodeDescriptor());
        Assert.assertEquals((String)"pendingReplications.getNumReplicas ", (long)7L, (long)this.getNumReplicas(pendingReplications, (Block)blk));
        for (i = 0; i < 7; ++i) {
            this.decrement(pendingReplications, (Block)blk, storages[i].getDatanodeDescriptor());
        }
        Assert.assertTrue((pendingReplications.size() == 9 ? 1 : 0) != 0);
        this.increment(pendingReplications, (Block)blk, DatanodeStorageInfo.toDatanodeDescriptors((DatanodeStorageInfo[])DFSTestUtil.createDatanodeStorageInfos(8)));
        Assert.assertTrue((pendingReplications.size() == 10 ? 1 : 0) != 0);
        for (i = 0; i < 10; ++i) {
            block = this.newBlockInfo(new Block((long)i, (long)i, 0L), i);
            int numReplicas = this.getNumReplicas(pendingReplications, (Block)block);
            Assert.assertTrue((numReplicas == i ? 1 : 0) != 0);
        }
        Assert.assertTrue((pendingReplications.getTimedOutBlocks() == null ? 1 : 0) != 0);
        try {
            Thread.sleep(1000L);
        }
        catch (Exception i3) {
            // empty catch block
        }
        for (int i4 = 10; i4 < 15; ++i4) {
            block = this.newBlockInfo(new Block((long)i4, (long)i4, 0L), i4);
            this.increment(pendingReplications, (Block)block, DatanodeStorageInfo.toDatanodeDescriptors((DatanodeStorageInfo[])DFSTestUtil.createDatanodeStorageInfos(i4)));
        }
        Assert.assertTrue((pendingReplications.size() == 15 ? 1 : 0) != 0);
        int loop = 0;
        while (pendingReplications.size() > 0) {
            try {
                Thread.sleep(1000L);
            }
            catch (Exception block3) {
                // empty catch block
            }
            ++loop;
        }
        System.out.println("Had to wait for " + loop + " seconds for the lot to timeout");
        Assert.assertEquals((String)"Size of pendingReplications ", (long)0L, (long)pendingReplications.size());
        long[] timedOut = pendingReplications.getTimedOutBlocks();
        Assert.assertTrue((timedOut != null && timedOut.length == 15 ? 1 : 0) != 0);
        for (long aTimedOut2 : timedOut) {
            Assert.assertTrue((aTimedOut2 < 15L ? 1 : 0) != 0);
        }
        timedOut = pendingReplications.getTimedOutBlocks();
        Assert.assertTrue((timedOut != null && timedOut.length == 15 ? 1 : 0) != 0);
        for (long aTimedOut1 : timedOut) {
            Assert.assertTrue((aTimedOut1 < 15L ? 1 : 0) != 0);
        }
        timedOut = pendingReplications.getTimedOutBlocks();
        Assert.assertTrue((timedOut != null && timedOut.length == 15 ? 1 : 0) != 0);
        for (long aTimedOut : timedOut) {
            Assert.assertTrue((aTimedOut < 15L ? 1 : 0) != 0);
        }
        pendingReplications.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockReceived() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setLong("dfs.blocksize", 1024L);
        conf.setInt("dfs.namenode.replication.interval", 100);
        MiniDFSCluster cluster = null;
        try {
            StorageReceivedDeletedBlocks[] report;
            DatanodeRegistration dnR;
            int i;
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(5).build();
            cluster.waitActive();
            DistributedFileSystem hdfs = cluster.getFileSystem();
            FSNamesystem fsn = cluster.getNamesystem();
            BlockManager blkManager = fsn.getBlockManager();
            String file = "/tmp.txt";
            Path filePath = new Path("/tmp.txt");
            short replFactor = 1;
            DFSTestUtil.createFile((FileSystem)hdfs, filePath, 1024L, replFactor, 0L);
            ArrayList<DataNode> datanodes = cluster.getDataNodes();
            for (int i2 = 0; i2 < 5; ++i2) {
                DataNodeTestUtils.setHeartbeatsDisabledForTests(datanodes.get(i2), true);
            }
            hdfs.setReplication(filePath, (short)5);
            BlockManagerTestUtil.computeAllPendingWork(blkManager);
            Assert.assertEquals((long)1L, (long)blkManager.pendingReplications.size());
            Block[] blocks = this.getBlocks("/tmp.txt", cluster, fsn);
            Assert.assertEquals((long)4L, (long)this.getNumReplicas(blkManager.pendingReplications, blocks[0]));
            LocatedBlock locatedBlock = hdfs.getClient().getLocatedBlocks("/tmp.txt", 0L).get(0);
            DatanodeInfo existingDn = locatedBlock.getLocations()[0];
            int reportDnNum = 0;
            String poolId = cluster.getNamesystem().getBlockPoolId();
            for (i = 0; i < 5 && reportDnNum < 2; ++i) {
                if (datanodes.get(i).getDatanodeId().equals((Object)existingDn)) continue;
                dnR = datanodes.get(i).getDNRegistrationForBP(poolId);
                report = new StorageReceivedDeletedBlocks[]{new StorageReceivedDeletedBlocks("Fake-storage-ID-Ignored", new ReceivedDeletedBlockInfo[]{new ReceivedDeletedBlockInfo(blocks[0], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, "")})};
                cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
                ++reportDnNum;
            }
            Assert.assertEquals((long)2L, (long)this.getNumReplicas(blkManager.pendingReplications, blocks[0]));
            for (i = 0; i < 5 && reportDnNum < 2; ++i) {
                if (datanodes.get(i).getDatanodeId().equals((Object)existingDn)) continue;
                dnR = datanodes.get(i).getDNRegistrationForBP(poolId);
                report = new StorageReceivedDeletedBlocks[]{new StorageReceivedDeletedBlocks("Fake-storage-ID-Ignored", new ReceivedDeletedBlockInfo[]{new ReceivedDeletedBlockInfo(blocks[0], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, "")})};
                cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
                ++reportDnNum;
            }
            Assert.assertEquals((long)2L, (long)this.getNumReplicas(blkManager.pendingReplications, blocks[0]));
            for (i = 0; i < 5; ++i) {
                DataNodeTestUtils.setHeartbeatsDisabledForTests(datanodes.get(i), false);
                String uuid = datanodes.get(i).getDatanodeUuid();
                DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
            }
            Thread.sleep(5000L);
            Assert.assertEquals((long)0L, (long)blkManager.pendingReplications.size());
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    private Block[] getBlocks(final String file, final MiniDFSCluster cluster, final FSNamesystem fsn) throws IOException {
        HopsTransactionalRequestHandler getBlocksHandler = new HopsTransactionalRequestHandler(HDFSOperationType.VERIFY_FILE_BLOCKS, file){

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                INodeLock il = lf.getINodeLock(TransactionLockTypes.INodeLockType.READ, TransactionLockTypes.INodeResolveType.PATH, new String[]{file}).setNameNodeID(cluster.getNameNode().getId()).setActiveNameNodes((Collection)cluster.getNameNode().getActiveNameNodes().getActiveNodes());
                locks.add((Lock)il).add(lf.getBlockLock());
            }

            public Object performTask() throws StorageException, IOException {
                INodeFile fileNode = fsn.getFSDirectory().getINode(file).asFile();
                BlockInfo[] blocks = fileNode.getBlocks();
                return blocks;
            }
        };
        return (Block[])getBlocksHandler.handle();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPendingAndInvalidate() throws Exception {
        HdfsConfiguration CONF = new HdfsConfiguration();
        CONF.setLong("dfs.blocksize", 1024L);
        CONF.setLong("dfs.heartbeat.interval", 1L);
        CONF.setInt("dfs.namenode.replication.interval", 1);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)CONF).numDataNodes(5).build();
        cluster.waitActive();
        FSNamesystem namesystem = cluster.getNamesystem();
        BlockManager bm = namesystem.getBlockManager();
        DistributedFileSystem fs = cluster.getFileSystem();
        try {
            Path filePath = new Path("/tmp.txt");
            DFSTestUtil.createFile((FileSystem)fs, filePath, 1024L, (short)3, 0L);
            for (DataNode dn : cluster.getDataNodes()) {
                DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
            }
            LocatedBlock block = NameNodeAdapter.getBlockLocations(cluster.getNameNode(), filePath.toString(), 0L, 1L).get(0);
            bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0], "STORAGE_ID", "TEST");
            bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1], "STORAGE_ID", "TEST");
            BlockManagerTestUtil.computeAllPendingWork(bm);
            BlockManagerTestUtil.updateState(bm);
            Assert.assertEquals((long)bm.getPendingReplicationBlocksCount(), (long)1L);
            Assert.assertEquals((long)this.getNumReplicas(bm.pendingReplications, (Block)((BlockInfo)block.getBlock().getLocalBlock())), (long)2L);
            fs.delete(filePath, true);
            int retries = 10;
            long pendingNum = bm.getPendingReplicationBlocksCount();
            while (pendingNum != 0L && retries-- > 0) {
                Thread.sleep(1000L);
                BlockManagerTestUtil.updateState(bm);
                pendingNum = bm.getPendingReplicationBlocksCount();
            }
            Assert.assertEquals((long)pendingNum, (long)0L);
        }
        finally {
            cluster.shutdown();
        }
    }

    private void increment(PendingReplicationBlocks pendingReplications, Block block, DatanodeDescriptor[] dn) throws IOException {
        this.incrementOrDecrementPendingReplications(pendingReplications, block, true, dn, null);
    }

    private void decrement(PendingReplicationBlocks pendingReplications, Block block, DatanodeDescriptor dn) throws IOException {
        this.incrementOrDecrementPendingReplications(pendingReplications, block, false, null, dn);
    }

    private void incrementOrDecrementPendingReplications(final PendingReplicationBlocks pendingReplications, final Block block, final boolean inc, final DatanodeDescriptor[] dns, final DatanodeDescriptor dn) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.TEST_PENDING_REPLICATION){
            INodeIdentifier inodeIdentifier;

            public void setUp() throws StorageException, IOException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock((Block)block);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualBlockLock(block.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(new LockFactory.BLK[]{LockFactory.BLK.PE}));
            }

            public Object performTask() throws StorageException, IOException {
                BlockInfo blockInfo = (BlockInfo)EntityManager.find((FinderType)BlockInfo.Finder.ByBlockIdAndINodeId, (Object[])new Object[]{block.getBlockId()});
                if (inc) {
                    pendingReplications.increment(blockInfo, dns);
                } else {
                    pendingReplications.decrement(blockInfo, dn);
                }
                return null;
            }
        }.handle();
    }

    private int getNumReplicas(final PendingReplicationBlocks pendingReplications, final Block block) throws IOException {
        return (Integer)new HopsTransactionalRequestHandler(HDFSOperationType.TEST_PENDING_REPLICATION){
            INodeIdentifier inodeIdentifier;

            public void setUp() throws StorageException, IOException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock((Block)block);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualBlockLock(block.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(new LockFactory.BLK[]{LockFactory.BLK.PE}));
            }

            public Object performTask() throws StorageException, IOException {
                BlockInfo blockInfo = (BlockInfo)EntityManager.find((FinderType)BlockInfo.Finder.ByBlockIdAndINodeId, (Object[])new Object[]{block.getBlockId()});
                return pendingReplications.getNumReplicas(blockInfo);
            }
        }.handle();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProcessPendingReplications() throws Exception {
        boolean REPLICATION_FACTOR = true;
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setInt("dfs.namenode.replication.interval", 100);
        conf.setInt("dfs.namenode.replication.pending.timeout-sec", 10);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(1).build();
        try {
            FSNamesystem namesystem = cluster.getNamesystem();
            BlockManager bm = namesystem.getBlockManager();
            DistributedFileSystem fs = cluster.getFileSystem();
            PendingReplicationBlocks pendingReplications = bm.pendingReplications;
            DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
            for (int i = 0; i < 10; ++i) {
                Path FILE_PATH = new Path("/testfile_" + i);
                DFSTestUtil.createFile((FileSystem)fs, FILE_PATH, 1L, (short)1, 1L);
                DFSTestUtil.waitReplication((FileSystem)fs, FILE_PATH, (short)1);
                fs.setReplication(FILE_PATH, (short)2);
                ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, FILE_PATH);
                DatanodeStorageInfo[] targets = new DatanodeStorageInfo[1];
                System.arraycopy(storages, i, targets, 0, 1);
                this.increment(pendingReplications, block.getLocalBlock(), DatanodeStorageInfo.toDatanodeDescriptors((DatanodeStorageInfo[])targets));
            }
            int test = pendingReplications.size();
            Assert.assertEquals((String)("Size of pendingReplications " + test), (long)10L, (long)pendingReplications.size());
            int loop = 0;
            while (pendingReplications.size() > 0) {
                try {
                    Thread.sleep(1000L);
                }
                catch (Exception block) {
                    // empty catch block
                }
                ++loop;
            }
            System.out.println("Had to wait for " + loop + " seconds for the lot to timeout");
            Assert.assertEquals((String)"Size of pendingReplications ", (long)0L, (long)pendingReplications.size());
            long[] timedOut = pendingReplications.getTimedOutBlocks();
            Assert.assertTrue((timedOut != null && timedOut.length == 10 ? 1 : 0) != 0);
            bm.processPendingReplications();
            timedOut = pendingReplications.getTimedOutBlocks();
            Assert.assertTrue((String)"blocks removed from pending", (timedOut == null ? 1 : 0) != 0);
            UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
            Assert.assertEquals((String)"Size of underReplications ", (long)10L, (long)queues.size());
        }
        finally {
            cluster.shutdown();
        }
    }

    private BlockInfo newBlockInfo(Block block, int inodeId) throws IOException {
        final BlockInfo blockInfo = new BlockInfo(block, (long)inodeId);
        new HopsTransactionalRequestHandler(HDFSOperationType.TEST){

            public void acquireLock(TransactionLocks locks) throws IOException {
            }

            public Object performTask() throws StorageException, IOException {
                EntityManager.add((Object)blockInfo);
                return null;
            }
        }.handle();
        return blockInfo;
    }
}

