/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import io.hops.leader_election.node.ActiveNode;
import io.hops.leader_election.node.ActiveNodePBImpl;
import io.hops.leader_election.node.SortedActiveNodeList;
import io.hops.leader_election.node.SortedActiveNodeListPBImpl;
import io.hops.metadata.HdfsStorageFactory;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BRLoadBalancingNonLeaderException;
import org.apache.hadoop.hdfs.server.blockmanagement.BRLoadBalancingOverloadException;
import org.apache.hadoop.hdfs.server.blockmanagement.BRTrackingService;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.BPOfferService;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.ExitUtil;
import org.junit.Assert;
import org.junit.Test;

public class TestBlockReportLoadBalancing1 {
    public static final Log LOG = LogFactory.getLog(TestBlockReportLoadBalancing1.class);

    @Test
    public void TestBRTrackingService_01() throws IOException, InterruptedException {
        int NN_COUNT = 5;
        long DFS_BR_LB_MAX_BR_PROCESSING_TIME = 5000L;
        long DFS_BR_LB_MAX_CONCURRENT_BLK_REPORTS_PER_NN = 5L;
        long DFS_BR_LB_DB_VAR_UPDATE_THRESHOLD = 1000L;
        Configuration conf = new Configuration();
        conf.setLong("dfs.block.report.load.balancer.max.concurrent.block.reports.per.nn", 5L);
        conf.setLong("dfs.block.report.load.balancing.max.block.report.processing.time", 5000L);
        conf.setLong("dfs.block.report.load.balancing.db.var.update.threashold", 1000L);
        HdfsStorageFactory.reset();
        HdfsStorageFactory.setConfiguration((Configuration)conf);
        HdfsStorageFactory.formatStorage();
        ArrayList<ActiveNodePBImpl> list = new ArrayList<ActiveNodePBImpl>();
        for (int i = 0; i < 5; ++i) {
            ActiveNodePBImpl anode = new ActiveNodePBImpl((long)i, "host", "localhost", i, "0.0.0." + i + ":10000", "0.0.0.0", 0);
            list.add(anode);
        }
        SortedActiveNodeListPBImpl nnList = new SortedActiveNodeListPBImpl(list);
        BRTrackingService service = new BRTrackingService(1000L, 5L, 5000L);
        String dnAddress = "";
        ActiveNode an = null;
        int i = 0;
        while ((long)i < 25L) {
            dnAddress = "0.0.0.0:" + i;
            an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 10000L);
            Assert.assertTrue((String)"Unable to assign work", (an != null ? 1 : 0) != 0);
            ++i;
        }
        an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 10000L);
        Assert.assertTrue((String)"More work should not have been assigned", (an == null ? 1 : 0) != 0);
        an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 1L);
        Assert.assertTrue((String)"More work should not have been assigned", (an == null ? 1 : 0) != 0);
        Thread.sleep(5000L);
        i = 0;
        while ((long)i < 25L) {
            dnAddress = "0.0.0.0:" + i;
            an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 10000L);
            Assert.assertTrue((String)"Unable to assign work", (an != null ? 1 : 0) != 0);
            ++i;
        }
        an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 10000L);
        Assert.assertTrue((String)"More work should not have been assigned", (an == null ? 1 : 0) != 0);
        an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 1L);
        Assert.assertTrue((String)"More work should not have been assigned", (an == null ? 1 : 0) != 0);
    }

    @Test
    public void TestBRTrackingService_02() throws IOException, InterruptedException {
        int NN_COUNT = 5;
        long DFS_BR_LB_MAX_BR_PROCESSING_TIME = 5000L;
        long DFS_BR_LB_MAX_CONCURRENT_BLK_REPORTS_PER_NN = 5L;
        long DFS_BR_LB_DB_VAR_UPDATE_THRESHOLD = 1000L;
        Configuration conf = new Configuration();
        conf.setLong("dfs.block.report.load.balancer.max.concurrent.block.reports.per.nn", 5L);
        conf.setLong("dfs.block.report.load.balancing.max.block.report.processing.time", 5000L);
        conf.setLong("dfs.block.report.load.balancing.db.var.update.threashold", 1000L);
        HdfsStorageFactory.reset();
        HdfsStorageFactory.setConfiguration((Configuration)conf);
        HdfsStorageFactory.formatStorage();
        ArrayList<ActiveNodePBImpl> list = new ArrayList<ActiveNodePBImpl>();
        for (int i = 0; i < 5; ++i) {
            ActiveNodePBImpl anode = new ActiveNodePBImpl((long)i, "host", "localhost", i, "0.0.0." + i + ":10000", "", 0);
            list.add(anode);
        }
        SortedActiveNodeListPBImpl nnList = new SortedActiveNodeListPBImpl(list);
        BRTrackingService service = new BRTrackingService(1000L, 5L, 5000L);
        ActiveNode an = null;
        String dnAddress = "";
        int i = 0;
        while ((long)i < 25L) {
            dnAddress = "0.0.0.0:" + i;
            an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 10000L);
            Assert.assertTrue((String)"Unable to assign work", (an != null ? 1 : 0) != 0);
            ++i;
        }
        an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 10000L);
        Assert.assertTrue((String)"More work should not have been assigned", (an == null ? 1 : 0) != 0);
        an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 1L);
        Assert.assertTrue((String)"More work should not have been assigned", (an == null ? 1 : 0) != 0);
        Thread.sleep(5000L);
        list.remove(0);
        nnList = new SortedActiveNodeListPBImpl(list);
        i = 0;
        while ((long)i < 5L * (long)nnList.size()) {
            dnAddress = "0.0.0.0:" + i;
            an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 10000L);
            Assert.assertTrue((String)"Unable to assign work", (an != null ? 1 : 0) != 0);
            ++i;
        }
        an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 1L);
        Assert.assertTrue((String)"More work should not have been assigned", (an == null ? 1 : 0) != 0);
        Thread.sleep(5000L);
        for (i = 5; i < 10; ++i) {
            ActiveNodePBImpl anode = new ActiveNodePBImpl((long)i, "host", "localhost", i, "0.0.0." + i + ":10000", "", 0);
            list.add(anode);
        }
        nnList = new SortedActiveNodeListPBImpl(list);
        i = 0;
        while ((long)i < 5L * (long)nnList.size()) {
            dnAddress = "0.0.0.0:" + i;
            an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 10000L);
            Assert.assertTrue((String)"Unable to assign work", (an != null ? 1 : 0) != 0);
            ++i;
        }
        an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 1L);
        Assert.assertTrue((String)"More work should not have been assigned", (an == null ? 1 : 0) != 0);
    }

    @Test
    public void TestCommandLine() throws IOException, InterruptedException {
        int NN_COUNT = 5;
        long DFS_BR_LB_MAX_BR_PROCESSING_TIME = 5000L;
        long DFS_BR_LB_MAX_CONCURRENT_BLK_REPORTS_PER_NN = 5L;
        long DFS_BR_LB_DB_VAR_UPDATE_THRESHOLD = 1000L;
        Configuration conf = new Configuration();
        conf.setLong("dfs.block.report.load.balancer.max.concurrent.block.reports.per.nn", 5L);
        conf.setLong("dfs.block.report.load.balancing.max.block.report.processing.time", 5000L);
        conf.setLong("dfs.block.report.load.balancing.db.var.update.threashold", 1000L);
        HdfsStorageFactory.reset();
        HdfsStorageFactory.setConfiguration((Configuration)conf);
        HdfsStorageFactory.formatStorage();
        ArrayList<ActiveNodePBImpl> list = new ArrayList<ActiveNodePBImpl>();
        for (int i = 0; i < 5; ++i) {
            ActiveNodePBImpl anode = new ActiveNodePBImpl((long)i, "host", "localhost", i, "0.0.0." + i + ":10000", "", 0);
            list.add(anode);
        }
        SortedActiveNodeListPBImpl nnList = new SortedActiveNodeListPBImpl(list);
        BRTrackingService service = new BRTrackingService(1000L, 5L, 5000L);
        ActiveNode an = null;
        String dnAddress = "";
        int i = 0;
        while ((long)i < 5L * (long)nnList.size()) {
            dnAddress = "0.0.0.0:" + i;
            an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 10000L);
            Assert.assertTrue((String)"Unable to assign work", (an != null ? 1 : 0) != 0);
            ++i;
        }
        an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 10000L);
        Assert.assertTrue((String)"More work should not have been assigned", (an == null ? 1 : 0) != 0);
        an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 1L);
        Assert.assertTrue((String)"More work should not have been assigned", (an == null ? 1 : 0) != 0);
        LOG.info((Object)"Chainging the number of concurrent block reports");
        String[] argv = new String[]{"-concurrentBlkReports", "10"};
        try {
            NameNode.createNameNode((String[])argv, (Configuration)conf);
        }
        catch (ExitUtil.ExitException e) {
            Assert.assertEquals((String)"concurrentBlkReports command should succeed", (long)0L, (long)e.status);
        }
        Thread.sleep(Math.max(5000L, 1000L));
        int i2 = 0;
        while ((long)i2 < 10L * (long)nnList.size()) {
            dnAddress = "0.0.0.0:" + i2;
            an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 10000L);
            Assert.assertTrue((String)"Unable to assign work", (an != null ? 1 : 0) != 0);
            ++i2;
        }
        an = TestBlockReportLoadBalancing1.assignWork((SortedActiveNodeList)nnList, service, dnAddress, 1L);
        Assert.assertTrue((String)"More work should not have been assigned", (an == null ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void TestClusterDataNodes() throws IOException, InterruptedException {
        int NN_COUNT = 5;
        int DN_COUNT = 10;
        long DFS_BR_LB_MAX_BR_PROCESSING_TIME = 5000L;
        long DFS_BR_LB_MAX_CONCURRENT_BLK_REPORTS_PER_NN = 1L;
        long DFS_BR_LB_DB_VAR_UPDATE_THRESHOLD = 1000L;
        Configuration conf = new Configuration();
        conf.setLong("dfs.block.report.load.balancer.max.concurrent.block.reports.per.nn", 1L);
        conf.setLong("dfs.block.report.load.balancing.max.block.report.processing.time", 5000L);
        conf.setLong("dfs.block.report.load.balancing.db.var.update.threashold", 1000L);
        MiniDFSCluster cluster = null;
        try {
            int i;
            cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHOPSTopology(NN_COUNT)).format(true).numDataNodes(DN_COUNT).build();
            cluster.waitActive();
            Thread.sleep(10000L);
            ActiveNode an = null;
            int i22 = 0;
            while ((long)i22 < 1L * (long)NN_COUNT) {
                an = this.getLeader(cluster, NN_COUNT).getNextNamenodeToSendBlockReport(1000L, (DatanodeID)((BPOfferService)cluster.getDataNodes().get((int)i22).getAllBpOs().get((int)0)).bpRegistration);
                LOG.info((Object)("Assigned work for datanode " + ((BPOfferService)cluster.getDataNodes().get((int)i22).getAllBpOs().get((int)0)).bpRegistration.getXferAddr()));
                Assert.assertTrue((String)"Unable to assign work", (an != null ? 1 : 0) != 0);
                ++i22;
            }
            try {
                an = this.getLeader(cluster, NN_COUNT).getNextNamenodeToSendBlockReport(1000L, (DatanodeID)((BPOfferService)cluster.getDataNodes().get((int)0).getAllBpOs().get((int)0)).bpRegistration);
                Assert.fail((String)"More work should not have been assigned");
            }
            catch (BRLoadBalancingOverloadException i22) {
            }
            catch (BRLoadBalancingNonLeaderException i22) {
                // empty catch block
            }
            String[] argv = new String[]{"-concurrentBlkReports", "2"};
            try {
                NameNode.createNameNode((String[])argv, (Configuration)conf);
            }
            catch (ExitUtil.ExitException e) {
                Assert.assertEquals((String)"concurrentBlkReports command should succeed", (long)0L, (long)e.status);
            }
            Thread.sleep(Math.max(5000L, 1000L));
            for (i = 0; i < 2 * NN_COUNT; ++i) {
                an = this.getLeader(cluster, NN_COUNT).getNextNamenodeToSendBlockReport(10000L, (DatanodeID)((BPOfferService)cluster.getDataNodes().get((int)i).getAllBpOs().get((int)0)).bpRegistration);
                Assert.assertTrue((String)"Unable to assign work", (an != null ? 1 : 0) != 0);
            }
            cluster.shutdownNameNode(NN_COUNT - 1);
            cluster.shutdownNameNode(NN_COUNT - 2);
            Thread.sleep(5000L);
            for (i = 0; i < 2 * (NN_COUNT - 2); ++i) {
                an = this.getLeader(cluster, NN_COUNT).getNextNamenodeToSendBlockReport(1000L, (DatanodeID)((BPOfferService)cluster.getDataNodes().get((int)i).getAllBpOs().get((int)0)).bpRegistration);
                Assert.assertTrue((String)"Unable to assign work", (an != null ? 1 : 0) != 0);
            }
            try {
                an = this.getLeader(cluster, NN_COUNT).getNextNamenodeToSendBlockReport(1000L, (DatanodeID)((BPOfferService)cluster.getDataNodes().get((int)0).getAllBpOs().get((int)0)).bpRegistration);
                Assert.fail((String)"More work should not have been assigned");
            }
            catch (BRLoadBalancingOverloadException bRLoadBalancingOverloadException) {
            }
            catch (BRLoadBalancingNonLeaderException bRLoadBalancingNonLeaderException) {
                // empty catch block
            }
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    private NameNode getLeader(MiniDFSCluster cluster, int NN_COUNT) {
        NameNode leader = null;
        for (int i = 0; i < NN_COUNT && !(leader = cluster.getNameNode(i)).isLeader(); ++i) {
        }
        return leader;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void TestUnregisteredDataNodesReport() throws IOException, InterruptedException {
        int NN_COUNT = 2;
        int DN_COUNT = 1;
        long DFS_BR_LB_MAX_BR_PROCESSING_TIME = 5000L;
        long DFS_BR_LB_MAX_CONCURRENT_BLK_REPORTS_PER_NN = NN_COUNT;
        long DFS_BR_LB_DB_VAR_UPDATE_THRESHOLD = 1000L;
        Configuration conf = new Configuration();
        conf.setLong("dfs.block.report.load.balancer.max.concurrent.block.reports.per.nn", DFS_BR_LB_MAX_CONCURRENT_BLK_REPORTS_PER_NN);
        conf.setLong("dfs.block.report.load.balancing.max.block.report.processing.time", 5000L);
        conf.setLong("dfs.block.report.load.balancing.db.var.update.threashold", 1000L);
        MiniDFSCluster cluster = null;
        try {
            cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHOPSTopology(NN_COUNT)).numDataNodes(DN_COUNT).format(true).build();
            cluster.waitActive();
            DatanodeRegistration dnr = new DatanodeRegistration(new DatanodeID("test:5050"), new StorageInfo(0, 0, "", 0L, HdfsServerConstants.NodeType.DATA_NODE, ""), ExportedBlockKeys.DUMMY_KEYS, "");
            try {
                cluster.getNameNode(0).getNextNamenodeToSendBlockReport(1000L, (DatanodeID)dnr);
                Assert.fail((String)"should throw a ProcessReport from dead or unregistred node exception");
            }
            catch (IOException ex) {
                Assert.assertTrue((String)("wrong error message " + ex.getMessage()), (boolean)ex.getMessage().contains("ProcessReport from dead or unregistered node"));
            }
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    private static ActiveNode assignWork(SortedActiveNodeList nnList, BRTrackingService service, String dnAddress, long blks) {
        try {
            return service.assignWork(nnList, dnAddress, blks);
        }
        catch (Exception e) {
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void TestClusterMultiNNBR() throws IOException, InterruptedException {
        int NN_COUNT = 5;
        int DN_COUNT = 10;
        long DFS_BR_LB_MAX_BR_PROCESSING_TIME = 5000L;
        long DFS_BR_LB_MAX_CONCURRENT_BLK_REPORTS_PER_NN = 2L;
        long DFS_BR_LB_DB_VAR_UPDATE_THRESHOLD = 1000L;
        Configuration conf = new Configuration();
        conf.setLong("dfs.block.report.load.balancer.max.concurrent.block.reports.per.nn", 2L);
        conf.setLong("dfs.block.report.load.balancing.max.block.report.processing.time", 5000L);
        conf.setLong("dfs.block.report.load.balancing.db.var.update.threashold", 1000L);
        MiniDFSCluster cluster = null;
        try {
            int i;
            cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHOPSTopology(NN_COUNT)).format(true).numDataNodes(DN_COUNT).build();
            cluster.waitActive();
            Thread.sleep(10000L);
            ActiveNode an = null;
            for (int i2 = 0; i2 < DN_COUNT; ++i2) {
                an = this.getLeader(cluster, NN_COUNT).getNextNamenodeToSendBlockReport(1000L, (DatanodeID)((BPOfferService)cluster.getDataNodes().get((int)i2).getAllBpOs().get((int)0)).bpRegistration);
                LOG.info((Object)("Assigned work for datanode " + ((BPOfferService)cluster.getDataNodes().get((int)i2).getAllBpOs().get((int)0)).bpRegistration.getXferAddr()));
                Assert.assertTrue((an != null ? 1 : 0) != 0);
            }
            try {
                an = this.getLeader(cluster, NN_COUNT).getNextNamenodeToSendBlockReport(1000L, (DatanodeID)((BPOfferService)cluster.getDataNodes().get((int)0).getAllBpOs().get((int)0)).bpRegistration);
                Assert.fail((String)"More work should not have been assigned");
            }
            catch (BRLoadBalancingOverloadException i2) {
                // empty catch block
            }
            for (i = 0; i < DN_COUNT; ++i) {
                String dnAddress = ((BPOfferService)cluster.getDataNodes().get((int)i).getAllBpOs().get((int)0)).bpRegistration.getXferAddr();
                for (int n = 0; n < NN_COUNT; ++n) {
                    NameNode nn = cluster.getNameNode(n);
                    nn.getBRTrackingService().blockReportCompleted(dnAddress);
                }
            }
            for (i = 0; i < DN_COUNT; ++i) {
                an = this.getLeader(cluster, NN_COUNT).getNextNamenodeToSendBlockReport(1000L, (DatanodeID)((BPOfferService)cluster.getDataNodes().get((int)i).getAllBpOs().get((int)0)).bpRegistration);
                LOG.info((Object)("Assigned work for datanode " + ((BPOfferService)cluster.getDataNodes().get((int)i).getAllBpOs().get((int)0)).bpRegistration.getXferAddr()));
                Assert.assertTrue((an != null ? 1 : 0) != 0);
            }
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }
}

