/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.procedure.flush;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
import org.apache.hadoop.hbase.procedure.flush.FlushTableSubprocedure;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;

@InterfaceAudience.LimitedPrivate(value={"Configuration"})
public class RegionServerFlushTableProcedureManager
extends RegionServerProcedureManager {
    private static final Log LOG = LogFactory.getLog(RegionServerFlushTableProcedureManager.class);
    private static final String CONCURENT_FLUSH_TASKS_KEY = "hbase.flush.procedure.region.concurrentTasks";
    private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3;
    public static final String FLUSH_REQUEST_THREADS_KEY = "hbase.flush.procedure.region.pool.threads";
    public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10;
    public static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.procedure.region.timeout";
    public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000L;
    public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY = "hbase.flush.procedure.region.wakefrequency";
    private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500L;
    private RegionServerServices rss;
    private ProcedureMemberRpcs memberRpcs;
    private ProcedureMember member;

    RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server, ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
        this.rss = server;
        this.memberRpcs = memberRpc;
        this.member = procMember;
    }

    public RegionServerFlushTableProcedureManager() {
    }

    @Override
    public void start() {
        LOG.debug((Object)("Start region server flush procedure manager " + this.rss.getServerName().toString()));
        this.memberRpcs.start(this.rss.getServerName().toString(), this.member);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop(boolean force) throws IOException {
        String mode = force ? "abruptly" : "gracefully";
        LOG.info((Object)("Stopping region server flush procedure manager " + mode + "."));
        try {
            this.member.close();
        }
        finally {
            this.memberRpcs.close();
        }
    }

    public Subprocedure buildSubprocedure(String table) {
        List<Region> involvedRegions;
        if (this.rss.isStopping() || this.rss.isStopped()) {
            throw new IllegalStateException("Can't start flush region subprocedure on RS: " + this.rss.getServerName() + ", because stopping/stopped!");
        }
        try {
            involvedRegions = this.getRegionsToFlush(table);
        }
        catch (IOException e1) {
            throw new IllegalStateException("Failed to figure out if there is region to flush.", e1);
        }
        LOG.debug((Object)("Launching subprocedure to flush regions for " + table));
        ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table);
        Configuration conf = this.rss.getConfiguration();
        long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, 60000L);
        long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY, 500L);
        FlushTableSubprocedurePool taskManager = new FlushTableSubprocedurePool(this.rss.getServerName().toString(), conf, this.rss);
        return new FlushTableSubprocedure(this.member, exnDispatcher, wakeMillis, timeoutMillis, involvedRegions, table, taskManager);
    }

    private List<Region> getRegionsToFlush(String table) throws IOException {
        return this.rss.getOnlineRegions(TableName.valueOf(table));
    }

    @Override
    public void initialize(RegionServerServices rss) throws KeeperException {
        this.rss = rss;
        ZooKeeperWatcher zkw = rss.getZooKeeper();
        this.memberRpcs = new ZKProcedureMemberRpcs(zkw, "flush-table-proc");
        Configuration conf = rss.getConfiguration();
        long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, 60000L);
        int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, 10);
        ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
        this.member = new ProcedureMember(this.memberRpcs, pool, new FlushTableSubprocedureBuilder());
    }

    @Override
    public String getProcedureSignature() {
        return "flush-table-proc";
    }

    static class FlushTableSubprocedurePool {
        private final Abortable abortable;
        private final ExecutorCompletionService<Void> taskPool;
        private final ThreadPoolExecutor executor;
        private volatile boolean stopped;
        private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
        private final String name;

        FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
            this.abortable = abortable;
            long keepAlive = conf.getLong(RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY, 60000L);
            int threads = conf.getInt(RegionServerFlushTableProcedureManager.CONCURENT_FLUSH_TASKS_KEY, 3);
            this.name = name;
            this.executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name + ")-flush-proc-pool"));
            this.taskPool = new ExecutorCompletionService(this.executor);
        }

        boolean hasTasks() {
            return this.futures.size() != 0;
        }

        void submitTask(Callable<Void> task) {
            Future<Void> f = this.taskPool.submit(task);
            this.futures.add(f);
        }

        boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
            LOG.debug((Object)"Waiting for local region flush to finish.");
            int sz = this.futures.size();
            try {
                int i;
                for (i = 0; i < sz; ++i) {
                    Future<Void> f = this.taskPool.take();
                    f.get();
                    if (!this.futures.remove(f)) {
                        LOG.warn((Object)("unexpected future" + f));
                    }
                    LOG.debug((Object)("Completed " + (i + 1) + "/" + sz + " local region flush tasks."));
                }
                LOG.debug((Object)("Completed " + sz + " local region flush tasks."));
                i = 1;
                return i != 0;
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Got InterruptedException in FlushSubprocedurePool", (Throwable)e);
                if (!this.stopped) {
                    Thread.currentThread().interrupt();
                    throw new ForeignException("FlushSubprocedurePool", e);
                }
            }
            catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause instanceof ForeignException) {
                    LOG.warn((Object)"Rethrowing ForeignException from FlushSubprocedurePool", (Throwable)e);
                    throw (ForeignException)e.getCause();
                }
                if (cause instanceof DroppedSnapshotException) {
                    this.abortable.abort("Received DroppedSnapshotException, aborting", cause);
                }
                LOG.warn((Object)"Got Exception in FlushSubprocedurePool", (Throwable)e);
                throw new ForeignException(this.name, e.getCause());
            }
            finally {
                this.cancelTasks();
            }
            return false;
        }

        void cancelTasks() throws InterruptedException {
            List<Future<Void>> tasks = this.futures;
            LOG.debug((Object)("cancelling " + tasks.size() + " flush region tasks " + this.name));
            for (Future future : tasks) {
                future.cancel(false);
            }
            this.futures.clear();
            while (this.taskPool.poll() != null) {
            }
            this.stop();
        }

        void stop() {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
            this.executor.shutdown();
        }
    }

    public class FlushTableSubprocedureBuilder
    implements SubprocedureFactory {
        @Override
        public Subprocedure buildSubprocedure(String name, byte[] data) {
            return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name);
        }
    }
}

