/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.HopsTransactionalRequestHandler;
import io.hops.transaction.lock.LockFactory;
import io.hops.transaction.lock.TransactionLocks;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.RetryCache;
import org.apache.hadoop.util.LightWeightCache;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestRetryCacheWithHA {
    private static final Log LOG = LogFactory.getLog(TestRetryCacheWithHA.class);
    private static MiniDFSCluster cluster;
    private static DistributedFileSystem dfs;
    private static Configuration conf;
    private static final int BlockSize = 1024;
    private static final short DataNodes = 3;
    private static final int CHECKTIMES = 10;
    private static final int ResponseSize = 3;
    private static final Map<String, Object> results;

    @Before
    public void setup() throws Exception {
        conf.setLong("dfs.blocksize", 1024L);
        conf.setBoolean("dfs.namenode.acls.enabled", true);
        conf.setInt("ipc.client.connect.max.retries", 0);
        conf.set("dfs.client.retry.policy.spec", "1000,2");
        conf.setInt("dfs.namenode.list.cache.directives.num.responses", 3);
        conf.setInt("dfs.namenode.list.cache.pools.num.responses", 3);
        cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHOPSTopology(2)).numDataNodes(3).build();
        cluster.waitActive();
        dfs = cluster.getFileSystem(0);
    }

    @After
    public void cleanup() throws Exception {
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    @Test
    public void testRetryCacheOnStandbyNN() throws Exception {
        DFSTestUtil.runOperations(cluster, dfs, conf, 1024L, 0);
        ArrayList<FSNamesystem> fsns = new ArrayList<FSNamesystem>();
        ArrayList<LightWeightCache> cacheSets = new ArrayList<LightWeightCache>();
        fsns.add(cluster.getNamesystem(0));
        fsns.add(cluster.getNamesystem(1));
        cacheSets.add((LightWeightCache)((FSNamesystem)fsns.get(0)).getRetryCache().getCacheSet());
        cacheSets.add((LightWeightCache)((FSNamesystem)fsns.get(1)).getRetryCache().getCacheSet());
        int usedNN = 0;
        if (((LightWeightCache)cacheSets.get(0)).size() < ((LightWeightCache)cacheSets.get(1)).size()) {
            usedNN = 1;
        }
        Assert.assertEquals((long)17L, (long)(((LightWeightCache)cacheSets.get(usedNN)).size() + ((LightWeightCache)cacheSets.get(1 - usedNN)).size()));
        HashMap<RetryCache.CacheEntry, RetryCache.CacheEntry> oldEntries = new HashMap<RetryCache.CacheEntry, RetryCache.CacheEntry>();
        for (RetryCache.CacheEntry entry : (LightWeightCache)cacheSets.get(usedNN)) {
            oldEntries.put(entry, entry);
        }
        for (RetryCache.CacheEntry entry : (LightWeightCache)cacheSets.get(1 - usedNN)) {
            oldEntries.put(entry, entry);
        }
        cluster.shutdownNameNode(usedNN);
        cluster.waitActive(1 - usedNN);
        this.fillCacheFromDB(oldEntries, (FSNamesystem)fsns.get(1 - usedNN));
        Assert.assertEquals((long)17L, (long)((LightWeightCache)cacheSets.get(1 - usedNN)).size());
        for (RetryCache.CacheEntry entry : (LightWeightCache)cacheSets.get(1 - usedNN)) {
            Assert.assertTrue((boolean)oldEntries.containsKey(entry));
        }
    }

    private void fillCacheFromDB(Map<RetryCache.CacheEntry, RetryCache.CacheEntry> oldEntries, final FSNamesystem namesystem) throws IOException {
        for (final RetryCache.CacheEntry entry : oldEntries.keySet()) {
            HopsTransactionalRequestHandler rh = new HopsTransactionalRequestHandler(HDFSOperationType.CONCAT){

                public void acquireLock(TransactionLocks locks) throws IOException {
                    LockFactory lf = LockFactory.getInstance();
                    locks.add(lf.getRetryCacheEntryLock(entry.getClientId(), entry.getCallId()));
                }

                public Object performTask() throws IOException {
                    namesystem.getRetryCache().getCacheSet().get((Object)entry);
                    return null;
                }
            };
            rh.handle();
        }
    }

    @Test(timeout=120000L)
    public void testListCachePools() throws Exception {
        int poolCount = 7;
        HashSet<String> poolNames = new HashSet<String>(7);
        for (int i = 0; i < 7; ++i) {
            String poolName = "testListCachePools-" + i;
            dfs.addCachePool(new CachePoolInfo(poolName));
            poolNames.add(poolName);
        }
        this.listCachePools(poolNames, 0);
        cluster.shutdownNameNode(0);
        cluster.waitActive(1);
        this.listCachePools(poolNames, 1);
    }

    @Test(timeout=120000L)
    public void testListCacheDirectives() throws Exception {
        int poolCount = 7;
        HashSet<String> poolNames = new HashSet<String>(7);
        Path path = new Path("/p");
        for (int i = 0; i < 7; ++i) {
            String poolName = "testListCacheDirectives-" + i;
            CacheDirectiveInfo directiveInfo = new CacheDirectiveInfo.Builder().setPool(poolName).setPath(path).build();
            dfs.addCachePool(new CachePoolInfo(poolName));
            dfs.addCacheDirective(directiveInfo, EnumSet.of(CacheFlag.FORCE));
            poolNames.add(poolName);
        }
        this.listCacheDirectives(poolNames, 0);
        cluster.shutdownNameNode(0);
        cluster.waitActive(1);
        this.listCacheDirectives(poolNames, 1);
    }

    private void listCachePools(HashSet<String> poolNames, int active) throws Exception {
        HashSet tmpNames = (HashSet)poolNames.clone();
        RemoteIterator pools = dfs.listCachePools();
        int poolCount = poolNames.size();
        for (int i = 0; i < poolCount; ++i) {
            CachePoolEntry pool = (CachePoolEntry)pools.next();
            String pollName = pool.getInfo().getPoolName();
            Assert.assertTrue((String)"The pool name should be expected", (boolean)tmpNames.remove(pollName));
            if (i % 2 != 0) continue;
            int standby = active;
            active = standby == 0 ? 1 : 0;
            cluster.shutdownNameNode(standby);
            cluster.waitActive(active);
            cluster.restartNameNode(standby, false);
        }
        Assert.assertTrue((String)"All pools must be found", (boolean)tmpNames.isEmpty());
    }

    private void listCacheDirectives(HashSet<String> poolNames, int active) throws Exception {
        HashSet tmpNames = (HashSet)poolNames.clone();
        RemoteIterator directives = dfs.listCacheDirectives(null);
        int poolCount = poolNames.size();
        for (int i = 0; i < poolCount; ++i) {
            CacheDirectiveEntry directive = (CacheDirectiveEntry)directives.next();
            String pollName = directive.getInfo().getPool();
            Assert.assertTrue((String)"The pool name should be expected", (boolean)tmpNames.remove(pollName));
            if (i % 2 != 0) continue;
            int standby = active;
            active = standby == 0 ? 1 : 0;
            cluster.shutdownNameNode(standby);
            cluster.waitActive(active);
            cluster.restartNameNode(standby, false);
        }
        Assert.assertTrue((String)"All pools must be found", (boolean)tmpNames.isEmpty());
    }

    static {
        conf = new HdfsConfiguration();
        results = new HashMap<String, Object>();
    }

    class RemoveCachePoolOp
    extends AtMostOnceOp {
        private String pool;

        RemoveCachePoolOp(DFSClient client, String pool) {
            super("removeCachePool", client);
            this.pool = pool;
        }

        @Override
        void prepare() throws Exception {
            this.client.addCachePool(new CachePoolInfo(this.pool));
        }

        @Override
        void invoke() throws Exception {
            this.client.removeCachePool(this.pool);
        }

        @Override
        boolean checkNamenodeBeforeReturn() throws Exception {
            for (int i = 0; i < 10; ++i) {
                RemoteIterator iter = dfs.listCachePools();
                if (!iter.hasNext()) {
                    return true;
                }
                Thread.sleep(1000L);
            }
            return false;
        }

        @Override
        Object getResult() {
            return null;
        }
    }

    class ModifyCachePoolOp
    extends AtMostOnceOp {
        String pool;

        ModifyCachePoolOp(DFSClient client, String pool) {
            super("modifyCachePool", client);
            this.pool = pool;
        }

        @Override
        void prepare() throws Exception {
            this.client.addCachePool(new CachePoolInfo(this.pool).setLimit(Long.valueOf(10L)));
        }

        @Override
        void invoke() throws Exception {
            this.client.modifyCachePool(new CachePoolInfo(this.pool).setLimit(Long.valueOf(99L)));
        }

        @Override
        boolean checkNamenodeBeforeReturn() throws Exception {
            for (int i = 0; i < 10; ++i) {
                RemoteIterator iter = dfs.listCachePools();
                if (iter.hasNext() && ((CachePoolEntry)iter.next()).getInfo().getLimit() == 99L) {
                    return true;
                }
                Thread.sleep(1000L);
            }
            return false;
        }

        @Override
        Object getResult() {
            return null;
        }
    }

    class AddCachePoolOp
    extends AtMostOnceOp {
        private String pool;

        AddCachePoolOp(DFSClient client, String pool) {
            super("addCachePool", client);
            this.pool = pool;
        }

        @Override
        void prepare() throws Exception {
        }

        @Override
        void invoke() throws Exception {
            this.client.addCachePool(new CachePoolInfo(this.pool));
        }

        @Override
        boolean checkNamenodeBeforeReturn() throws Exception {
            for (int i = 0; i < 10; ++i) {
                RemoteIterator iter = dfs.listCachePools();
                if (iter.hasNext()) {
                    return true;
                }
                Thread.sleep(1000L);
            }
            return false;
        }

        @Override
        Object getResult() {
            return null;
        }
    }

    class RemoveCacheDirectiveInfoOp
    extends AtMostOnceOp {
        private CacheDirectiveInfo directive;
        private long id;

        RemoveCacheDirectiveInfoOp(DFSClient client, String pool, String path) {
            super("removeCacheDirective", client);
            this.directive = new CacheDirectiveInfo.Builder().setPool(pool).setPath(new Path(path)).build();
        }

        @Override
        void prepare() throws Exception {
            dfs.addCachePool(new CachePoolInfo(this.directive.getPool()));
            this.id = dfs.addCacheDirective(this.directive, EnumSet.of(CacheFlag.FORCE));
        }

        @Override
        void invoke() throws Exception {
            this.client.removeCacheDirective(this.id);
        }

        @Override
        boolean checkNamenodeBeforeReturn() throws Exception {
            for (int i = 0; i < 10; ++i) {
                RemoteIterator iter = dfs.listCacheDirectives(new CacheDirectiveInfo.Builder().setPool(this.directive.getPool()).setPath(this.directive.getPath()).build());
                if (!iter.hasNext()) {
                    return true;
                }
                Thread.sleep(1000L);
            }
            return false;
        }

        @Override
        Object getResult() {
            return null;
        }
    }

    class ModifyCacheDirectiveInfoOp
    extends AtMostOnceOp {
        private final CacheDirectiveInfo directive;
        private final short newReplication;
        private long id;

        ModifyCacheDirectiveInfoOp(DFSClient client, CacheDirectiveInfo directive, short newReplication) {
            super("modifyCacheDirective", client);
            this.directive = directive;
            this.newReplication = newReplication;
        }

        @Override
        void prepare() throws Exception {
            dfs.addCachePool(new CachePoolInfo(this.directive.getPool()));
            this.id = this.client.addCacheDirective(this.directive, EnumSet.of(CacheFlag.FORCE));
        }

        @Override
        void invoke() throws Exception {
            this.client.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(Long.valueOf(this.id)).setReplication(Short.valueOf(this.newReplication)).build(), EnumSet.of(CacheFlag.FORCE));
        }

        @Override
        boolean checkNamenodeBeforeReturn() throws Exception {
            for (int i = 0; i < 10; ++i) {
                RemoteIterator iter = dfs.listCacheDirectives(new CacheDirectiveInfo.Builder().setPool(this.directive.getPool()).setPath(this.directive.getPath()).build());
                while (iter.hasNext()) {
                    CacheDirectiveInfo result = ((CacheDirectiveEntry)iter.next()).getInfo();
                    if (result.getId() != this.id || result.getReplication() != this.newReplication) continue;
                    return true;
                }
                Thread.sleep(1000L);
            }
            return false;
        }

        @Override
        Object getResult() {
            return null;
        }
    }

    class AddCacheDirectiveInfoOp
    extends AtMostOnceOp {
        private CacheDirectiveInfo directive;
        private Long result;

        AddCacheDirectiveInfoOp(DFSClient client, CacheDirectiveInfo directive) {
            super("addCacheDirective", client);
            this.directive = directive;
        }

        @Override
        void prepare() throws Exception {
            dfs.addCachePool(new CachePoolInfo(this.directive.getPool()));
        }

        @Override
        void invoke() throws Exception {
            this.result = this.client.addCacheDirective(this.directive, EnumSet.of(CacheFlag.FORCE));
        }

        @Override
        boolean checkNamenodeBeforeReturn() throws Exception {
            for (int i = 0; i < 10; ++i) {
                RemoteIterator iter = dfs.listCacheDirectives(new CacheDirectiveInfo.Builder().setPool(this.directive.getPool()).setPath(this.directive.getPath()).build());
                if (iter.hasNext()) {
                    return true;
                }
                Thread.sleep(1000L);
            }
            return false;
        }

        @Override
        Object getResult() {
            return this.result;
        }
    }

    abstract class AtMostOnceOp {
        private final String name;
        final DFSClient client;

        AtMostOnceOp(String name, DFSClient client) {
            this.name = name;
            this.client = client;
        }

        abstract void prepare() throws Exception;

        abstract void invoke() throws Exception;

        abstract boolean checkNamenodeBeforeReturn() throws Exception;

        abstract Object getResult();
    }

    private static class DummyRetryInvocationHandler
    extends RetryInvocationHandler {
        static AtomicBoolean block = new AtomicBoolean(false);

        DummyRetryInvocationHandler(FailoverProxyProvider<ClientProtocol> proxyProvider, RetryPolicy retryPolicy) {
            super(proxyProvider, retryPolicy);
        }

        protected Object invokeMethod(Method method, Object[] args) throws Throwable {
            Object result = super.invokeMethod(method, args);
            if (block.get()) {
                throw new UnknownHostException("Fake Exception");
            }
            return result;
        }
    }
}

