/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hudi.org.apache.hadoop.hbase.util.compaction;

import io.hops.hudi.org.apache.hadoop.hbase.HBaseConfiguration;
import io.hops.hudi.org.apache.hadoop.hbase.HRegionLocation;
import io.hops.hudi.org.apache.hadoop.hbase.NotServingRegionException;
import io.hops.hudi.org.apache.hadoop.hbase.ServerName;
import io.hops.hudi.org.apache.hadoop.hbase.TableName;
import io.hops.hudi.org.apache.hadoop.hbase.client.Admin;
import io.hops.hudi.org.apache.hadoop.hbase.client.CompactionState;
import io.hops.hudi.org.apache.hadoop.hbase.client.Connection;
import io.hops.hudi.org.apache.hadoop.hbase.client.ConnectionFactory;
import io.hops.hudi.org.apache.hadoop.hbase.client.RegionInfo;
import io.hops.hudi.org.apache.hadoop.hbase.util.Bytes;
import io.hops.hudi.org.apache.hadoop.hbase.util.compaction.ClusterCompactionQueues;
import io.hops.hudi.org.apache.hadoop.hbase.util.compaction.MajorCompactionRequest;
import io.hops.hudi.org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import io.hops.hudi.org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import io.hops.hudi.org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import io.hops.hudi.org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import io.hops.hudi.org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import io.hops.hudi.org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import io.hops.hudi.org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import io.hops.hudi.org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
import io.hops.hudi.org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
import io.hops.hudi.org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
import io.hops.hudi.org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
import io.hops.hudi.org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.LimitedPrivate(value={"Tools"})
public class MajorCompactor
extends Configured
implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
    protected static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
    protected ClusterCompactionQueues clusterCompactionQueues;
    private long timestamp;
    protected Set<String> storesToCompact;
    protected ExecutorService executor;
    protected long sleepForMs;
    protected Connection connection;
    protected TableName tableName;
    private int numServers = -1;
    private int numRegions = -1;
    private boolean skipWait = false;

    MajorCompactor() {
    }

    public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact, int concurrency, long timestamp, long sleepForMs) throws IOException {
        this.connection = ConnectionFactory.createConnection(conf);
        this.tableName = tableName;
        this.timestamp = timestamp;
        this.storesToCompact = storesToCompact;
        this.executor = Executors.newFixedThreadPool(concurrency);
        this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
        this.sleepForMs = sleepForMs;
    }

    public void compactAllRegions() throws Exception {
        ArrayList<Future<?>> futures = Lists.newArrayList();
        while (this.clusterCompactionQueues.hasWorkItems() || !this.futuresComplete(futures)) {
            while (this.clusterCompactionQueues.atCapacity()) {
                LOG.debug("Waiting for servers to complete Compactions");
                Thread.sleep(this.sleepForMs);
            }
            Optional<ServerName> serverToProcess = this.clusterCompactionQueues.getLargestQueueFromServersNotCompacting();
            if (serverToProcess.isPresent() && this.clusterCompactionQueues.hasWorkItems()) {
                ServerName serverName = serverToProcess.get();
                MajorCompactionRequest request = this.clusterCompactionQueues.reserveForCompaction(serverName);
                ServerName currentServer = this.connection.getRegionLocator(this.tableName).getRegionLocation(request.getRegion().getStartKey()).getServerName();
                if (!currentServer.equals(serverName)) {
                    LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: " + serverName + " to: " + currentServer + " re-queuing request");
                    this.clusterCompactionQueues.addToCompactionQueue(currentServer, request);
                    this.clusterCompactionQueues.releaseCompaction(serverName);
                    continue;
                }
                LOG.info("Firing off compaction request for server: " + serverName + ", " + request + " total queue size left: " + this.clusterCompactionQueues.getCompactionRequestsLeftToFinish());
                futures.add(this.executor.submit(new Compact(serverName, request)));
                continue;
            }
            Thread.sleep(this.sleepForMs);
        }
        LOG.info("All compactions have completed");
    }

    private boolean futuresComplete(List<Future<?>> futures) {
        futures.removeIf(Future::isDone);
        return futures.isEmpty();
    }

    public void shutdown() throws Exception {
        this.executor.shutdown();
        this.executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        if (!ERRORS.isEmpty()) {
            StringBuilder builder = new StringBuilder().append("Major compaction failed, there were: ").append(ERRORS.size()).append(" regions / stores that failed compacting\n").append("Failed compaction requests\n").append("--------------------------\n").append(Joiner.on("\n").join(ERRORS));
            LOG.error(builder.toString());
        }
        if (this.connection != null) {
            this.connection.close();
        }
        LOG.info("All regions major compacted successfully");
    }

    @InterfaceAudience.Private
    void initializeWorkQueues() throws IOException {
        if (this.storesToCompact.isEmpty()) {
            this.connection.getTable(this.tableName).getDescriptor().getColumnFamilyNames().forEach(a -> this.storesToCompact.add(Bytes.toString(a)));
            LOG.info("No family specified, will execute for all families");
        }
        LOG.info("Initializing compaction queues for table:  " + this.tableName + " with cf: " + this.storesToCompact);
        Map<ServerName, List<RegionInfo>> snRegionMap = this.getServerRegionsMap();
        block0: for (ServerName sn : this.getServersToCompact(snRegionMap.keySet())) {
            List<RegionInfo> regions = snRegionMap.get(sn);
            LOG.debug("Table: " + this.tableName + " Server: " + sn + " No of regions: " + regions.size());
            Collections.shuffle(regions);
            int regionsToCompact = this.numRegions;
            for (RegionInfo hri : regions) {
                if (this.numRegions > 0 && regionsToCompact <= 0) {
                    LOG.debug("Reached region limit for server: " + sn);
                    continue block0;
                }
                Optional<MajorCompactionRequest> request = this.getMajorCompactionRequest(hri);
                if (!request.isPresent()) continue;
                LOG.debug("Adding region " + hri + " to queue " + sn + " for compaction");
                this.clusterCompactionQueues.addToCompactionQueue(sn, request.get());
                if (this.numRegions <= 0) continue;
                --regionsToCompact;
            }
        }
    }

    protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri) throws IOException {
        return MajorCompactionRequest.newRequest(this.connection.getConfiguration(), hri, this.storesToCompact, this.timestamp);
    }

    private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) {
        if (this.numServers < 0 || snSet.size() <= this.numServers) {
            return snSet;
        }
        ArrayList<ServerName> snList = Lists.newArrayList(snSet);
        Collections.shuffle(snList);
        return snList.subList(0, this.numServers);
    }

    private Map<ServerName, List<RegionInfo>> getServerRegionsMap() throws IOException {
        HashMap<ServerName, List<RegionInfo>> snRegionMap = Maps.newHashMap();
        List<HRegionLocation> regionLocations = this.connection.getRegionLocator(this.tableName).getAllRegionLocations();
        for (HRegionLocation regionLocation : regionLocations) {
            ServerName sn = regionLocation.getServerName();
            RegionInfo hri = regionLocation.getRegion();
            if (!snRegionMap.containsKey(sn)) {
                snRegionMap.put(sn, Lists.newArrayList());
            }
            ((List)snRegionMap.get(sn)).add(hri);
        }
        return snRegionMap;
    }

    public void setNumServers(int numServers) {
        this.numServers = numServers;
    }

    public void setNumRegions(int numRegions) {
        this.numRegions = numRegions;
    }

    public void setSkipWait(boolean skipWait) {
        this.skipWait = skipWait;
    }

    private boolean isCompacting(MajorCompactionRequest request) throws Exception {
        CompactionState compactionState = this.connection.getAdmin().getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes());
        return compactionState.equals((Object)CompactionState.MAJOR) || compactionState.equals((Object)CompactionState.MAJOR_AND_MINOR);
    }

    private void addNewRegions() {
        try {
            List<HRegionLocation> locations = this.connection.getRegionLocator(this.tableName).getAllRegionLocations();
            for (HRegionLocation location : locations) {
                if (location.getRegion().getRegionId() <= this.timestamp) continue;
                Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest.newRequest(this.connection.getConfiguration(), location.getRegion(), this.storesToCompact, this.timestamp);
                compactionRequest.ifPresent(request -> this.clusterCompactionQueues.addToCompactionQueue(location.getServerName(), (MajorCompactionRequest)request));
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request) throws IOException {
        return request.getStoresRequiringCompaction(this.storesToCompact, this.timestamp);
    }

    protected Options getCommonOptions() {
        Options options = new Options();
        options.addOption(Option.builder("servers").required().desc("Concurrent servers compacting").hasArg().build());
        options.addOption(Option.builder("minModTime").desc("Compact if store files have modification time < minModTime").hasArg().build());
        options.addOption(Option.builder("zk").optionalArg(true).desc("zk quorum").hasArg().build());
        options.addOption(Option.builder("rootDir").optionalArg(true).desc("hbase.rootDir").hasArg().build());
        options.addOption(Option.builder("sleep").desc("Time to sleepForMs (ms) for checking compaction status per region and available work queues: default 30s").hasArg().build());
        options.addOption(Option.builder("retries").desc("Max # of retries for a compaction request, defaults to 3").hasArg().build());
        options.addOption(Option.builder("dryRun").desc("Dry run, will just output a list of regions that require compaction based on parameters passed").hasArg(false).build());
        options.addOption(Option.builder("skipWait").desc("Skip waiting after triggering compaction.").hasArg(false).build());
        options.addOption(Option.builder("numservers").optionalArg(true).desc("Number of servers to compact in this run, defaults to all").hasArg().build());
        options.addOption(Option.builder("numregions").optionalArg(true).desc("Number of regions to compact per server, defaults to all").hasArg().build());
        return options;
    }

    public int run(String[] args2) throws Exception {
        Options options = this.getCommonOptions();
        options.addOption(Option.builder("table").required().desc("table name").hasArg().build());
        options.addOption(Option.builder("cf").optionalArg(true).desc("column families: comma separated eg: a,b,c").hasArg().build());
        DefaultParser cmdLineParser = new DefaultParser();
        CommandLine commandLine = null;
        try {
            commandLine = cmdLineParser.parse(options, args2);
        }
        catch (ParseException parseException) {
            System.out.println("ERROR: Unable to parse command-line arguments " + Arrays.toString(args2) + " due to: " + parseException);
            MajorCompactor.printUsage(options);
            return -1;
        }
        if (commandLine == null) {
            System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args2));
            MajorCompactor.printUsage(options);
            return -1;
        }
        String tableName = commandLine.getOptionValue("table");
        String cf = commandLine.getOptionValue("cf", null);
        HashSet<String> families = Sets.newHashSet();
        if (cf != null) {
            Iterables.addAll(families, Splitter.on(",").split(cf));
        }
        Configuration configuration = this.getConf();
        int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
        long minModTime = Long.parseLong(commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis())));
        String quorum = commandLine.getOptionValue("zk", configuration.get("hbase.zookeeper.quorum"));
        String rootDir = commandLine.getOptionValue("rootDir", configuration.get("hbase.rootdir"));
        long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000L)));
        int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
        int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
        configuration.set("hbase.rootdir", rootDir);
        configuration.set("hbase.zookeeper.quorum", quorum);
        MajorCompactor compactor = new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency, minModTime, sleep);
        compactor.setNumServers(numServers);
        compactor.setNumRegions(numRegions);
        compactor.setSkipWait(commandLine.hasOption("skipWait"));
        compactor.initializeWorkQueues();
        if (!commandLine.hasOption("dryRun")) {
            compactor.compactAllRegions();
        }
        compactor.shutdown();
        return ERRORS.size();
    }

    protected static void printUsage(Options options) {
        String header = "\nUsage instructions\n\n";
        String footer = "\n";
        HelpFormatter formatter = new HelpFormatter();
        formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
    }

    public static void main(String[] args2) throws Exception {
        ToolRunner.run((Configuration)HBaseConfiguration.create(), (Tool)new MajorCompactor(), (String[])args2);
    }

    class Compact
    implements Runnable {
        private final ServerName serverName;
        private final MajorCompactionRequest request;

        Compact(ServerName serverName, MajorCompactionRequest request) {
            this.serverName = serverName;
            this.request = request;
        }

        @Override
        public void run() {
            try {
                this.compactAndWait(this.request);
            }
            catch (NotServingRegionException e) {
                LOG.warn("Region is invalid, requesting updated regions", (Throwable)e);
                MajorCompactor.this.addNewRegions();
            }
            catch (Exception e) {
                LOG.warn("Error compacting:", (Throwable)e);
            }
            finally {
                MajorCompactor.this.clusterCompactionQueues.releaseCompaction(this.serverName);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void compactAndWait(MajorCompactionRequest request) throws Exception {
            Admin admin = MajorCompactor.this.connection.getAdmin();
            try {
                Set<String> stores;
                if (!MajorCompactor.this.isCompacting(request) && !(stores = MajorCompactor.this.getStoresRequiringCompaction(request)).isEmpty()) {
                    request.setStores(stores);
                    for (String store : request.getStores()) {
                        this.compactRegionOnServer(request, admin, store);
                    }
                }
                if (!MajorCompactor.this.skipWait) {
                    while (MajorCompactor.this.isCompacting(request)) {
                        Thread.sleep(MajorCompactor.this.sleepForMs);
                        LOG.debug("Waiting for compaction to complete for region: " + request.getRegion().getEncodedName());
                    }
                }
            }
            finally {
                if (!MajorCompactor.this.skipWait) {
                    int waitForArchive = MajorCompactor.this.connection.getConfiguration().getInt("hbase.hfile.compaction.discharger.interval", 120000);
                    Thread.sleep(waitForArchive);
                    Set<String> storesRequiringCompaction = MajorCompactor.this.getStoresRequiringCompaction(request);
                    if (!storesRequiringCompaction.isEmpty()) {
                        boolean regionHasNotMoved = MajorCompactor.this.connection.getRegionLocator(MajorCompactor.this.tableName).getRegionLocation(request.getRegion().getStartKey()).getServerName().equals(this.serverName);
                        if (regionHasNotMoved) {
                            LOG.error("Not all store files were compacted, this may be due to the regionserver not being aware of all store files.  Will not reattempt compacting, " + request);
                            ERRORS.add(request);
                        } else {
                            request.setStores(storesRequiringCompaction);
                            MajorCompactor.this.clusterCompactionQueues.addToCompactionQueue(this.serverName, request);
                            LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction + " region: " + request.getRegion().getEncodedName());
                        }
                    } else {
                        LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName() + " -> cf(s): " + request.getStores());
                    }
                }
            }
        }

        private void compactRegionOnServer(MajorCompactionRequest request, Admin admin, String store) throws IOException {
            admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), Bytes.toBytes(store));
        }
    }
}

