/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import io.hops.exception.StorageException;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.hdfs.dal.ReplicaDataAccess;
import io.hops.metadata.hdfs.entity.INodeIdentifier;
import io.hops.transaction.EntityManager;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.HopsTransactionalRequestHandler;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.handler.RequestHandler;
import io.hops.transaction.lock.LockFactory;
import io.hops.transaction.lock.TransactionLockTypes;
import io.hops.transaction.lock.TransactionLocks;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;

public class TestHeartbeatHandling {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeat() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).build();
        try {
            cluster.waitActive();
            FSNamesystem namesystem = cluster.getNamesystem();
            HeartbeatManager hm = namesystem.getBlockManager().getDatanodeManager().getHeartbeatManager();
            String poolId = namesystem.getBlockPoolId();
            DatanodeRegistration nodeReg = DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
            DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, (DatanodeID)nodeReg);
            String storageID = DatanodeStorage.generateUuid();
            dd.updateStorage(new DatanodeStorage(storageID));
            boolean REMAINING_BLOCKS = true;
            int MAX_REPLICATE_LIMIT = conf.getInt("dfs.namenode.replication.max-streams", 2);
            int MAX_INVALIDATE_LIMIT = 1000;
            int MAX_INVALIDATE_BLOCKS = 2001;
            int MAX_REPLICATE_BLOCKS = 2 * MAX_REPLICATE_LIMIT + 1;
            DatanodeStorageInfo[] ONE_TARGET = new DatanodeStorageInfo[]{dd.getStorageInfo(storageID)};
            HeartbeatManager heartbeatManager = hm;
            synchronized (heartbeatManager) {
                for (int i = 0; i < MAX_REPLICATE_BLOCKS; ++i) {
                    dd.addBlockToBeReplicated(new Block((long)i, 0L, 1000L), ONE_TARGET);
                }
                DatanodeCommand[] cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem).getCommands();
                Assert.assertEquals((long)1L, (long)cmds.length);
                Assert.assertEquals((long)1L, (long)cmds[0].getAction());
                Assert.assertEquals((long)MAX_REPLICATE_LIMIT, (long)((BlockCommand)cmds[0]).getBlocks().length);
                ArrayList<Block> blockList = new ArrayList<Block>(2001);
                for (int i = 0; i < 2001; ++i) {
                    blockList.add(new Block((long)i, 0L, 1000L));
                }
                dd.addBlocksToBeInvalidated(blockList);
                cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem).getCommands();
                Assert.assertEquals((long)2L, (long)cmds.length);
                Assert.assertEquals((long)1L, (long)cmds[0].getAction());
                Assert.assertEquals((long)MAX_REPLICATE_LIMIT, (long)((BlockCommand)cmds[0]).getBlocks().length);
                Assert.assertEquals((long)2L, (long)cmds[1].getAction());
                Assert.assertEquals((long)1000L, (long)((BlockCommand)cmds[1]).getBlocks().length);
                cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem).getCommands();
                Assert.assertEquals((long)2L, (long)cmds.length);
                Assert.assertEquals((long)1L, (long)cmds[0].getAction());
                Assert.assertEquals((long)1L, (long)((BlockCommand)cmds[0]).getBlocks().length);
                Assert.assertEquals((long)2L, (long)cmds[1].getAction());
                Assert.assertEquals((long)1000L, (long)((BlockCommand)cmds[1]).getBlocks().length);
                cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem).getCommands();
                Assert.assertEquals((long)1L, (long)cmds.length);
                Assert.assertEquals((long)2L, (long)cmds[0].getAction());
                Assert.assertEquals((long)1L, (long)((BlockCommand)cmds[0]).getBlocks().length);
                cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem).getCommands();
                Assert.assertEquals((long)0L, (long)cmds.length);
            }
        }
        finally {
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatBlockRecovery() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(3).build();
        try {
            cluster.waitActive();
            FSNamesystem namesystem = cluster.getNamesystem();
            HeartbeatManager hm = namesystem.getBlockManager().getDatanodeManager().getHeartbeatManager();
            String poolId = namesystem.getBlockPoolId();
            DatanodeRegistration nodeReg1 = DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
            DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, (DatanodeID)nodeReg1);
            DatanodeRegistration nodeReg2 = DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
            DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, (DatanodeID)nodeReg2);
            DatanodeRegistration nodeReg3 = DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
            DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, (DatanodeID)nodeReg3);
            HeartbeatManager heartbeatManager = hm;
            synchronized (heartbeatManager) {
                NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem);
                NameNodeAdapter.sendHeartBeat(nodeReg2, dd2, namesystem);
                NameNodeAdapter.sendHeartBeat(nodeReg3, dd3, namesystem);
                dd1.setLastUpdate(System.currentTimeMillis());
                dd2.setLastUpdate(System.currentTimeMillis());
                dd3.setLastUpdate(System.currentTimeMillis());
                DatanodeStorageInfo[] storages = new DatanodeStorageInfo[]{dd1.getStorageInfos()[0], dd2.getStorageInfos()[0], dd3.getStorageInfos()[0]};
                BlockInfoUnderConstruction blockInfo = this.createBlockInfoUnderConstruction(storages);
                dd1.addBlockToBeRecovered(blockInfo);
                DatanodeCommand[] cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
                Assert.assertEquals((long)1L, (long)cmds.length);
                Assert.assertEquals((long)6L, (long)cmds[0].getAction());
                BlockRecoveryCommand recoveryCommand = (BlockRecoveryCommand)cmds[0];
                Assert.assertEquals((long)1L, (long)recoveryCommand.getRecoveringBlocks().size());
                DatanodeInfo[] recoveringNodes = recoveryCommand.getRecoveringBlocks().toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
                Assert.assertEquals((long)3L, (long)recoveringNodes.length);
                ArrayList<DatanodeDescriptor> dds = new ArrayList<DatanodeDescriptor>();
                dds.add(dd1);
                dds.add(dd2);
                dds.add(dd3);
                for (DatanodeInfo di : recoveringNodes) {
                    dds.remove(di);
                }
                Assert.assertEquals((long)dds.size(), (long)0L);
                dd1.setLastUpdate(System.currentTimeMillis());
                dd2.setLastUpdate(System.currentTimeMillis() - 40000L);
                dd3.setLastUpdate(System.currentTimeMillis());
                blockInfo = this.createBlockInfoUnderConstruction(storages);
                dd1.addBlockToBeRecovered(blockInfo);
                cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
                Assert.assertEquals((long)1L, (long)cmds.length);
                Assert.assertEquals((long)6L, (long)cmds[0].getAction());
                recoveryCommand = (BlockRecoveryCommand)cmds[0];
                Assert.assertEquals((long)1L, (long)recoveryCommand.getRecoveringBlocks().size());
                recoveringNodes = recoveryCommand.getRecoveringBlocks().toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
                Assert.assertEquals((long)2L, (long)recoveringNodes.length);
                dds = new ArrayList();
                dds.add(dd1);
                dds.add(dd3);
                for (DatanodeInfo di : recoveringNodes) {
                    dds.remove(di);
                }
                Assert.assertEquals((long)dds.size(), (long)0L);
                dd1.setLastUpdate(System.currentTimeMillis() - 60000L);
                dd2.setLastUpdate(System.currentTimeMillis() - 40000L);
                dd3.setLastUpdate(System.currentTimeMillis() - 80000L);
                blockInfo = this.createBlockInfoUnderConstruction(storages);
                dd1.addBlockToBeRecovered(blockInfo);
                cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
                Assert.assertEquals((long)1L, (long)cmds.length);
                Assert.assertEquals((long)6L, (long)cmds[0].getAction());
                recoveryCommand = (BlockRecoveryCommand)cmds[0];
                Assert.assertEquals((long)1L, (long)recoveryCommand.getRecoveringBlocks().size());
                recoveringNodes = recoveryCommand.getRecoveringBlocks().toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
                Assert.assertEquals((long)3L, (long)recoveringNodes.length);
                dds = new ArrayList();
                dds.add(dd1);
                dds.add(dd2);
                dds.add(dd3);
                for (DatanodeInfo di : recoveringNodes) {
                    dds.remove(di);
                }
                Assert.assertEquals((long)dds.size(), (long)0L);
            }
        }
        finally {
            cluster.shutdown();
        }
    }

    private BlockInfoUnderConstruction createBlockInfoUnderConstruction(final DatanodeStorageInfo[] storages) throws IOException {
        return (BlockInfoUnderConstruction)new HopsTransactionalRequestHandler(HDFSOperationType.COMMIT_BLOCK_SYNCHRONIZATION){
            INodeIdentifier inodeIdentifier;
            {
                super(x0);
                this.inodeIdentifier = new INodeIdentifier(Long.valueOf(3L));
            }

            public void setUp() throws StorageException {
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getLeaseLock(TransactionLockTypes.LockType.WRITE)).add(lf.getLeasePathLock(TransactionLockTypes.LockType.READ_COMMITTED)).add(lf.getBlockLock(10L, this.inodeIdentifier)).add(lf.getBlockRelated(new LockFactory.BLK[]{LockFactory.BLK.RE, LockFactory.BLK.CR, LockFactory.BLK.ER, LockFactory.BLK.UC, LockFactory.BLK.UR}));
            }

            public Object performTask() throws IOException {
                Block block = new Block(10L, 0L, 1000L);
                EntityManager.add((Object)new BlockInfo(block, this.inodeIdentifier != null ? this.inodeIdentifier.getInodeId() : (long)BlockInfo.NON_EXISTING_ID));
                BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(block, 3L, HdfsServerConstants.BlockUCState.UNDER_RECOVERY, storages);
                return blockInfo;
            }
        }.handle();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=120000L)
    public void testDeadReplicasCleanup() throws IOException, InterruptedException, TimeoutException {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setLong("dfs.heartbeat.interval", 1L);
        conf.setInt("dfs.namenode.heartbeat.recheck-interval", 10000);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(3).build();
        try {
            cluster.waitActive();
            DistributedFileSystem fs = cluster.getFileSystem();
            Path file1 = new Path("/test1");
            DFSTestUtil.createFile((FileSystem)fs, file1, 6144L, (short)3, 1L);
            DFSTestUtil.waitReplication((FileSystem)fs, file1, (short)3);
            DataNode dn = cluster.getDataNodes().get(0);
            DatanodeDescriptor dnDescriptor = cluster.getNameNode().getNamesystem().getBlockManager().getDatanodeManager().getDatanode(dn.getDatanodeId());
            cluster.shutdownNameNodes();
            cluster.stopDataNode(dn.getDisplayName());
            assert (this.getReplicasOnStorage(dnDescriptor.getStorageInfos()[0].getSid()) + this.getReplicasOnStorage(dnDescriptor.getStorageInfos()[1].getSid()) > 0);
            cluster.restartNameNode(true, false);
            assert (this.getReplicasOnStorage(dnDescriptor.getStorageInfos()[0].getSid()) + this.getReplicasOnStorage(dnDescriptor.getStorageInfos()[1].getSid()) > 0);
            long start = Time.now();
            while (start > Time.now() - 3L * cluster.getNameNode().getNamesystem().getBlockManager().getDatanodeManager().getHeartbeatExpireInterval() && this.getReplicasOnStorage(dnDescriptor.getStorageInfos()[0].getSid()) + this.getReplicasOnStorage(dnDescriptor.getStorageInfos()[1].getSid()) > 0) {
                Thread.sleep(1000L);
            }
            assert (this.getReplicasOnStorage(dnDescriptor.getStorageInfos()[0].getSid()) + this.getReplicasOnStorage(dnDescriptor.getStorageInfos()[1].getSid()) <= 0);
        }
        finally {
            cluster.shutdown();
        }
    }

    private int getReplicasOnStorage(final int sid) throws IOException {
        return (Integer)new LightWeightRequestHandler((RequestHandler.OperationType)HDFSOperationType.TEST){

            public Object performTask() throws IOException {
                ReplicaDataAccess da = (ReplicaDataAccess)HdfsStorageFactory.getDataAccess(ReplicaDataAccess.class);
                return da.countAllReplicasForStorageId(sid);
            }
        }.handle();
    }
}

