/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import io.hops.hudi.com.beust.jcommander.JCommander;
import io.hops.hudi.com.beust.jcommander.Parameter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieCompactor {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieCompactor.class);
    public static final String EXECUTE = "execute";
    public static final String SCHEDULE = "schedule";
    public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
    private final Config cfg;
    private transient FileSystem fs;
    private TypedProperties props;
    private final JavaSparkContext jsc;
    private final HoodieTableMetaClient metaClient;

    public HoodieCompactor(JavaSparkContext jsc, Config cfg) {
        this.cfg = cfg;
        this.jsc = jsc;
        this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : this.readConfigFromFileSystem(jsc, cfg);
        this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), (Object)false);
        this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
        if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
            UtilHelpers.addLockOptions(cfg.basePath, this.props);
        }
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
        return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args2) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args2);
        if (cfg.help.booleanValue() || args2.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        JavaSparkContext jsc = UtilHelpers.buildSparkContext("compactor-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
        int ret = 0;
        try {
            HoodieCompactor compactor = new HoodieCompactor(jsc, cfg);
            ret = compactor.compact(cfg.retry);
        }
        catch (Throwable throwable) {
            LOG.error("Fail to run compaction for " + cfg.tableName, throwable);
        }
        finally {
            jsc.stop();
            System.exit(ret);
        }
    }

    public int compact(int retry) {
        this.fs = FSUtils.getFs(this.cfg.basePath, this.jsc.hadoopConfiguration());
        HoodieCompactor.validateRunningMode(this.cfg);
        LOG.info(this.cfg.toString());
        int ret = UtilHelpers.retry(retry, () -> {
            switch (this.cfg.runningMode.toLowerCase()) {
                case "schedule": {
                    int result;
                    LOG.info("Running Mode: [schedule]; Do schedule");
                    Option<String> instantTime = this.doSchedule(this.jsc);
                    int n = result = instantTime.isPresent() ? 0 : -1;
                    if (result == 0) {
                        LOG.info("The schedule instant time is " + instantTime.get());
                    }
                    return result;
                }
                case "scheduleandexecute": {
                    LOG.info("Running Mode: [scheduleandexecute]");
                    return this.doScheduleAndCompact(this.jsc);
                }
                case "execute": {
                    LOG.info("Running Mode: [execute]; Do compaction");
                    return this.doCompact(this.jsc);
                }
            }
            LOG.info("Unsupported running mode [" + this.cfg.runningMode + "], quit the job directly");
            return -1;
        }, "Compact failed");
        return ret;
    }

    private Integer doScheduleAndCompact(JavaSparkContext jsc) throws Exception {
        LOG.info("Step 1: Do schedule");
        Option<String> instantTime = this.doSchedule(jsc);
        if (!instantTime.isPresent()) {
            LOG.warn("Couldn't do schedule");
            return -1;
        }
        this.cfg.compactionInstantTime = instantTime.get();
        LOG.info("The schedule instant time is " + instantTime.get());
        LOG.info("Step 2: Do compaction");
        return this.doCompact(jsc);
    }

    private static void validateRunningMode(Config cfg) {
        if (StringUtils.isNullOrEmpty(cfg.runningMode)) {
            cfg.runningMode = cfg.runSchedule != false ? SCHEDULE : EXECUTE;
        }
    }

    private int doCompact(JavaSparkContext jsc) throws Exception {
        String schemaStr = StringUtils.isNullOrEmpty(this.cfg.schemaFile) ? this.getSchemaFromLatestInstant() : UtilHelpers.parseSchema(this.fs, this.cfg.schemaFile);
        LOG.info("Schema --> : " + schemaStr);
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            if (StringUtils.isNullOrEmpty(this.cfg.compactionInstantTime)) {
                HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, this.cfg.basePath, true);
                Option<HoodieInstant> firstCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
                if (firstCompactionInstant.isPresent()) {
                    this.cfg.compactionInstantTime = firstCompactionInstant.get().getTimestamp();
                    LOG.info("Found the earliest scheduled compaction instant which will be executed: " + this.cfg.compactionInstantTime);
                } else {
                    LOG.info("There is no scheduled compaction in the table.");
                    int n = 0;
                    return n;
                }
            }
            HoodieWriteMetadata compactionMetadata = client.compact(this.cfg.compactionInstantTime);
            this.clean(client);
            int n = UtilHelpers.handleErrors(compactionMetadata.getCommitMetadata().get(), this.cfg.compactionInstantTime);
            return n;
        }
    }

    private Option<String> doSchedule(JavaSparkContext jsc) {
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, "", this.cfg.parallelism, Option.of(this.cfg.strategyClassName), this.props);){
            if (StringUtils.isNullOrEmpty(this.cfg.compactionInstantTime)) {
                LOG.warn("No instant time is provided for scheduling compaction.");
                Option<String> option = client.scheduleCompaction(Option.empty());
                return option;
            }
            client.scheduleCompactionAtInstant(this.cfg.compactionInstantTime, Option.empty());
            Option<String> option = Option.of(this.cfg.compactionInstantTime);
            return option;
        }
    }

    private String getSchemaFromLatestInstant() throws Exception {
        TableSchemaResolver schemaUtil = new TableSchemaResolver(this.metaClient);
        Schema schema = schemaUtil.getTableAvroSchema(false);
        return schema.toString();
    }

    private void clean(SparkRDDWriteClient<?> client) {
        if (client.getConfig().isAutoClean()) {
            client.clean();
        }
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-sp"}, description="Base path for the table", required=true)
        public String basePath = null;
        @Parameter(names={"--table-name", "-tn"}, description="Table name", required=true)
        public String tableName = null;
        @Parameter(names={"--instant-time", "-it"}, description="Compaction Instant time", required=false)
        public String compactionInstantTime = null;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for hoodie insert", required=false)
        public int parallelism = 200;
        @Parameter(names={"--schema-file", "-sf"}, description="path for Avro schema file", required=false)
        public String schemaFile = null;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master", required=false)
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=false)
        public String sparkMemory = null;
        @Parameter(names={"--retry", "-rt"}, description="number of retries", required=false)
        public int retry = 0;
        @Parameter(names={"--schedule", "-sc"}, description="Schedule compaction", required=false)
        public Boolean runSchedule = false;
        @Parameter(names={"--mode", "-m"}, description="Set job mode: Set \"schedule\" means make a compact plan; Set \"execute\" means execute a compact plan at given instant which means --instant-time is needed here; Set \"scheduleAndExecute\" means make a compact plan first and execute that plan immediately", required=false)
        public String runningMode = null;
        @Parameter(names={"--strategy", "-st"}, description="Strategy Class", required=false)
        public String strategyClassName = LogFileSizeBasedCompactionStrategy.class.getName();
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client for compacting")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();

        public String toString() {
            return "HoodieCompactorConfig {\n   --base-path " + this.basePath + ", \n   --table-name " + this.tableName + ", \n   --instant-time " + this.compactionInstantTime + ", \n   --parallelism " + this.parallelism + ", \n   --schema-file " + this.schemaFile + ", \n   --spark-master " + this.sparkMaster + ", \n   --spark-memory " + this.sparkMemory + ", \n   --retry " + this.retry + ", \n   --schedule " + this.runSchedule + ", \n   --mode " + this.runningMode + ", \n   --strategy " + this.strategyClassName + ", \n   --props " + this.propsFilePath + ", \n   --hoodie-conf " + this.configs + "\n}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Config config = (Config)o;
            return this.basePath.equals(config.basePath) && Objects.equals(this.tableName, config.tableName) && Objects.equals(this.compactionInstantTime, config.compactionInstantTime) && Objects.equals(this.parallelism, config.parallelism) && Objects.equals(this.schemaFile, config.schemaFile) && Objects.equals(this.sparkMaster, config.sparkMaster) && Objects.equals(this.sparkMemory, config.sparkMemory) && Objects.equals(this.retry, config.retry) && Objects.equals(this.runSchedule, config.runSchedule) && Objects.equals(this.runningMode, config.runningMode) && Objects.equals(this.strategyClassName, config.strategyClassName) && Objects.equals(this.propsFilePath, config.propsFilePath) && Objects.equals(this.configs, config.configs);
        }

        public int hashCode() {
            return Objects.hash(this.basePath, this.tableName, this.compactionInstantTime, this.schemaFile, this.sparkMaster, this.parallelism, this.sparkMemory, this.retry, this.runSchedule, this.runningMode, this.strategyClassName, this.propsFilePath, this.configs, this.help);
        }
    }
}

