/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;

public class HoodieClusteringJob {
    public static final String EXECUTE = "execute";
    public static final String SCHEDULE = "schedule";
    public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
    private static final Logger LOG = LogManager.getLogger(HoodieClusteringJob.class);
    private final Config cfg;
    private transient FileSystem fs;
    private TypedProperties props;
    private final JavaSparkContext jsc;
    private final HoodieTableMetaClient metaClient;

    public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
        this.cfg = cfg;
        this.jsc = jsc;
        this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath) ? UtilHelpers.buildProperties(cfg.configs) : this.readConfigFromFileSystem(jsc, cfg);
        this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
        return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
    }

    public static void main(String[] args) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        JavaSparkContext jsc = UtilHelpers.buildSparkContext("clustering-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
        HoodieClusteringJob clusteringJob = new HoodieClusteringJob(jsc, cfg);
        int result = clusteringJob.cluster(cfg.retry);
        String resultMsg = String.format("Clustering with basePath: %s, tableName: %s, runningMode: %s", cfg.basePath, cfg.tableName, cfg.runningMode);
        if (result == -1) {
            LOG.error((Object)(resultMsg + " failed"));
        } else {
            LOG.info((Object)(resultMsg + " success"));
        }
        jsc.stop();
    }

    private static void validateRunningMode(Config cfg) {
        if (StringUtils.isNullOrEmpty(cfg.runningMode)) {
            cfg.runningMode = cfg.runSchedule != false ? SCHEDULE : EXECUTE;
        }
    }

    public int cluster(int retry) {
        this.fs = FSUtils.getFs(this.cfg.basePath, this.jsc.hadoopConfiguration());
        HoodieClusteringJob.validateRunningMode(this.cfg);
        int ret = UtilHelpers.retry(retry, () -> {
            switch (this.cfg.runningMode.toLowerCase()) {
                case "schedule": {
                    int result;
                    LOG.info((Object)"Running Mode: [schedule]; Do schedule");
                    Option<String> instantTime = this.doSchedule(this.jsc);
                    int n = result = instantTime.isPresent() ? 0 : -1;
                    if (result == 0) {
                        LOG.info((Object)("The schedule instant time is " + instantTime.get()));
                    }
                    return result;
                }
                case "scheduleandexecute": {
                    LOG.info((Object)"Running Mode: [scheduleandexecute]");
                    return this.doScheduleAndCluster(this.jsc);
                }
                case "execute": {
                    LOG.info((Object)"Running Mode: [execute]; Do cluster");
                    return this.doCluster(this.jsc);
                }
            }
            LOG.info((Object)("Unsupported running mode [" + this.cfg.runningMode + "], quit the job directly"));
            return -1;
        }, "Cluster failed");
        return ret;
    }

    private String getSchemaFromLatestInstant() throws Exception {
        TableSchemaResolver schemaUtil = new TableSchemaResolver(this.metaClient);
        if (this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
            throw new HoodieException("Cannot run clustering without any completed commits");
        }
        Schema schema = schemaUtil.getTableAvroSchema(false);
        return schema.toString();
    }

    private int doCluster(JavaSparkContext jsc) throws Exception {
        String schemaStr = this.getSchemaFromLatestInstant();
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            if (StringUtils.isNullOrEmpty(this.cfg.clusteringInstantTime)) {
                Option<HoodieInstant> firstClusteringInstant = this.metaClient.getActiveTimeline().firstInstant("replacecommit", HoodieInstant.State.REQUESTED);
                if (firstClusteringInstant.isPresent()) {
                    this.cfg.clusteringInstantTime = firstClusteringInstant.get().getTimestamp();
                    LOG.info((Object)("Found the earliest scheduled clustering instant which will be executed: " + this.cfg.clusteringInstantTime));
                } else {
                    throw new HoodieClusteringException("There is no scheduled clustering in the table.");
                }
            }
            Option<HoodieCommitMetadata> commitMetadata = client.cluster(this.cfg.clusteringInstantTime, true).getCommitMetadata();
            int n = this.handleErrors(commitMetadata.get(), this.cfg.clusteringInstantTime);
            return n;
        }
    }

    public Option<String> doSchedule() throws Exception {
        return this.doSchedule(this.jsc);
    }

    private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
        String schemaStr = this.getSchemaFromLatestInstant();
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            Option<String> option = this.doSchedule(client);
            return option;
        }
    }

    private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client) {
        if (this.cfg.clusteringInstantTime != null) {
            client.scheduleClusteringAtInstant(this.cfg.clusteringInstantTime, Option.empty());
            return Option.of(this.cfg.clusteringInstantTime);
        }
        return client.scheduleClustering(Option.empty());
    }

    private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
        LOG.info((Object)"Step 1: Do schedule");
        String schemaStr = this.getSchemaFromLatestInstant();
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            HoodieSparkTable table2;
            HoodieTimeline inflightHoodieTimeline;
            Option<Object> instantTime = Option.empty();
            if (this.cfg.retryLastFailedClusteringJob.booleanValue() && !(inflightHoodieTimeline = (table2 = HoodieSparkTable.create(client.getConfig(), client.getEngineContext())).getActiveTimeline().filterPendingReplaceTimeline().filterInflights()).empty()) {
                HoodieInstant inflightClusteringInstant = inflightHoodieTimeline.lastInstant().get();
                Date clusteringStartTime = HoodieActiveTimeline.parseDateFromInstantTime(inflightClusteringInstant.getTimestamp());
                if (clusteringStartTime.getTime() + this.cfg.maxProcessingTimeMs < System.currentTimeMillis()) {
                    LOG.info((Object)("Found failed clustering instant at : " + inflightClusteringInstant + "; Will rollback the failed clustering and re-trigger again."));
                    instantTime = Option.of(inflightHoodieTimeline.lastInstant().get().getTimestamp());
                } else {
                    LOG.info((Object)(inflightClusteringInstant + " might still be in progress, will trigger a new clustering job."));
                }
            }
            Option<Object> option = instantTime = instantTime.isPresent() ? instantTime : this.doSchedule(client);
            if (!instantTime.isPresent()) {
                LOG.info((Object)"Couldn't generate cluster plan");
                int table2 = -1;
                return table2;
            }
            LOG.info((Object)("The schedule instant time is " + (String)instantTime.get()));
            LOG.info((Object)"Step 2: Do cluster");
            Option<HoodieCommitMetadata> metadata = client.cluster((String)instantTime.get(), true).getCommitMetadata();
            int n = this.handleErrors(metadata.get(), (String)instantTime.get());
            return n;
        }
    }

    private int handleErrors(HoodieCommitMetadata metadata, String instantTime) {
        List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> ((List)e.getValue()).stream()).collect(Collectors.toList());
        long errorsCount = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
        if (errorsCount == 0L) {
            LOG.info((Object)String.format("Table imported into hoodie with %s instant time.", instantTime));
            return 0;
        }
        LOG.error((Object)String.format("Import failed with %d errors.", errorsCount));
        return -1;
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-sp"}, description="Base path for the table", required=true)
        public String basePath = null;
        @Parameter(names={"--table-name", "-tn"}, description="Table name", required=true)
        public String tableName = null;
        @Parameter(names={"--instant-time", "-it"}, description="Clustering Instant time, only used when set --mode execute. If the instant time is not provided with --mode execute, the earliest scheduled clustering instant time is used by default. When set \"--mode scheduleAndExecute\" this instant-time will be ignored.", required=false)
        public String clusteringInstantTime = null;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for hoodie insert", required=false)
        public int parallelism = 1;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master", required=false)
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=true)
        public String sparkMemory = null;
        @Parameter(names={"--retry", "-rt"}, description="number of retries", required=false)
        public int retry = 0;
        @Parameter(names={"--schedule", "-sc"}, description="Schedule clustering @desperate soon please use \"--mode schedule\" instead")
        public Boolean runSchedule = false;
        @Parameter(names={"--retry-last-failed-clustering-job", "-rc"}, description="Take effect when using --mode/-m scheduleAndExecute. Set true means check, rollback and execute last failed clustering plan instead of planing a new clustering job directly.", required=false)
        public Boolean retryLastFailedClusteringJob = false;
        @Parameter(names={"--mode", "-m"}, description="Set job mode: Set \"schedule\" means make a cluster plan; Set \"execute\" means execute a cluster plan at given instant which means --instant-time is needed here; Set \"scheduleAndExecute\" means make a cluster plan first and execute that plan immediately", required=false)
        public String runningMode = null;
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;
        @Parameter(names={"--job-max-processing-time-ms", "-jt"}, description="Take effect when using --mode/-m scheduleAndExecute and --retry-last-failed-clustering-job/-rc true. If maxProcessingTimeMs passed but clustering job is still unfinished, hoodie would consider this job as failed and relaunch.", required=false)
        public long maxProcessingTimeMs = 0L;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client for clustering")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();
    }
}

