/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import io.hops.leaderElection.HdfsLeDescriptorFactory;
import io.hops.leaderElection.LeaderElection;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.HdfsVariables;
import io.hops.metadata.election.entity.LeDescriptorFactory;
import io.hops.metadata.hdfs.dal.RetryCacheEntryDataAccess;
import io.hops.metadata.hdfs.entity.RetryCacheEntry;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.handler.RequestHandler;
import io.hops.util.Slicer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.util.Daemon;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;

public class TestRetryCacheCleaner {
    static final Log LOG = LogFactory.getLog(TestRetryCacheCleaner.class);
    Random rand = new Random(System.currentTimeMillis());
    Configuration conf;
    LeaderElection leaderElection;
    final int DELETE_BATCH_SIZE = 200;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetryCleaner() throws Exception {
        try {
            int TOTAL_EPOCHS = 10;
            int ROWS_PER_EPOCH = 600;
            Logger.getRootLogger().setLevel(Level.INFO);
            Logger.getLogger(FSNamesystem.RetryCacheCleaner.class).setLevel(Level.ALL);
            this.setup();
            long entryExpiryMillis = this.conf.getLong("dfs.namenode.retrycache.expirytime.millis", 600000L);
            long epoch = (System.currentTimeMillis() - entryExpiryMillis) / 1000L;
            HdfsVariables.setRetryCacheCleanerEpoch((long)(epoch - 1L));
            for (int i = 0; i < 10; ++i) {
                this.insertTestData(epoch + (long)i, 600);
            }
            Thread.sleep(10000L);
            Daemon retryCacheCleanerThread = new Daemon((Runnable)new FSNamesystem.RetryCacheCleaner(this.conf, this.leaderElection));
            retryCacheCleanerThread.setName("Retry Cache Cleaner");
            retryCacheCleanerThread.start();
            Thread.sleep(20000L);
            ((FSNamesystem.RetryCacheCleaner)retryCacheCleanerThread.getRunnable()).stopMonitor();
            retryCacheCleanerThread.interrupt();
            Assert.assertTrue((String)"Did not clean up all rows", (this.countRows() == 0 ? 1 : 0) != 0);
        }
        finally {
            this.cleanup();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetryCleanerFastCleanup() throws Exception {
        try {
            int TOTAL_EPOCHS_SEC = 100;
            int ROWS_PER_EPOCH = 600;
            Logger.getRootLogger().setLevel(Level.INFO);
            Logger.getLogger(FSNamesystem.RetryCacheCleaner.class).setLevel(Level.ALL);
            this.setup();
            long entryExpiryMillis = this.conf.getLong("dfs.namenode.retrycache.expirytime.millis", 600000L);
            long epoch = (System.currentTimeMillis() - entryExpiryMillis - 100000L) / 1000L;
            HdfsVariables.setRetryCacheCleanerEpoch((long)(epoch - 1L));
            for (int i = 0; i < 100; ++i) {
                this.insertTestData(epoch + (long)i, 600);
            }
            Thread.sleep(1000L);
            Daemon retryCacheCleanerThread = new Daemon((Runnable)new FSNamesystem.RetryCacheCleaner(this.conf, this.leaderElection));
            retryCacheCleanerThread.setName("Retry Cache Cleaner");
            retryCacheCleanerThread.start();
            Thread.sleep(5000L);
            ((FSNamesystem.RetryCacheCleaner)retryCacheCleanerThread.getRunnable()).stopMonitor();
            retryCacheCleanerThread.interrupt();
            Assert.assertTrue((String)"Did not clean up all rows", (this.countRows() == 0 ? 1 : 0) != 0);
        }
        finally {
            this.cleanup();
        }
    }

    public void cleanup() throws InterruptedException {
        if (this.conf != null) {
            this.conf = null;
        }
        if (this.leaderElection != null) {
            this.leaderElection.stopElectionThread();
            while (!this.leaderElection.isStopped()) {
                Thread.sleep(10L);
            }
        }
    }

    public int countRows() throws Exception {
        return (Integer)new LightWeightRequestHandler((RequestHandler.OperationType)HDFSOperationType.TEST){

            public Object performTask() throws IOException {
                RetryCacheEntryDataAccess da = (RetryCacheEntryDataAccess)HdfsStorageFactory.getDataAccess(RetryCacheEntryDataAccess.class);
                return da.count();
            }
        }.handle();
    }

    public void setup() throws Exception {
        this.conf = new HdfsConfiguration();
        this.conf.setInt("dfs.namenode.retrycache.delete.batch.size", 200);
        HdfsStorageFactory.setConfiguration((Configuration)this.conf);
        HdfsStorageFactory.formatStorage();
        long leadercheckInterval = this.conf.getInt("dfs.leader.check.interval", 2000);
        int missedHeartBeatThreshold = this.conf.getInt("dfs.leader.missed.hb", 2);
        int leIncrement = this.conf.getInt("dfs.leader.tp.increment", 100);
        String httpAddress = "127.0.0.1:8020";
        String rpcAddresses = "127.0.0.1:8020";
        this.leaderElection = new LeaderElection((LeDescriptorFactory)new HdfsLeDescriptorFactory(), leadercheckInterval, missedHeartBeatThreshold, (long)leIncrement, httpAddress, rpcAddresses, (byte)this.conf.getInt("dfs.locationDomainId", 0));
        this.leaderElection.start();
    }

    private void insertTestData(final long epoch, int count) throws Exception {
        final int maxDML = 500;
        int maxInsertThreads = 10;
        final String clientIDStr = Integer.toString(this.rand.nextInt());
        ExecutorService executorService = Executors.newFixedThreadPool(maxInsertThreads);
        Slicer.slice((int)count, (int)maxDML, (int)10, (ExecutorService)executorService, (Slicer.OperationHandler)new Slicer.OperationHandler(){

            public void handle(int startIndex, int endIndex) throws Exception {
                final ArrayList<RetryCacheEntry> entries = new ArrayList<RetryCacheEntry>(maxDML);
                for (int i = startIndex; i < endIndex; ++i) {
                    entries.add(new RetryCacheEntry(clientIDStr.getBytes(), i, epoch));
                }
                new LightWeightRequestHandler((RequestHandler.OperationType)HDFSOperationType.TEST){

                    public Object performTask() throws IOException {
                        RetryCacheEntryDataAccess da = (RetryCacheEntryDataAccess)HdfsStorageFactory.getDataAccess(RetryCacheEntryDataAccess.class);
                        da.prepare((Collection)Collections.EMPTY_LIST, (Collection)entries);
                        return null;
                    }
                }.handle();
            }
        });
        LOG.info((Object)("Inserted data for epoch: " + epoch));
    }
}

