/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import io.hops.hudi.com.beust.jcommander.JCommander;
import io.hops.hudi.com.beust.jcommander.Parameter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class HoodieClusteringJob {
    private static final Logger LOG = LogManager.getLogger(HoodieClusteringJob.class);
    private final Config cfg;
    private transient FileSystem fs;
    private TypedProperties props;
    private final JavaSparkContext jsc;

    public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
        this.cfg = cfg;
        this.jsc = jsc;
        this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : this.readConfigFromFileSystem(jsc, cfg);
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
        FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
        return UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
    }

    public static void main(String[] args) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0 || !cfg.runSchedule.booleanValue() && cfg.clusteringInstantTime == null) {
            cmd.usage();
            System.exit(1);
        }
        JavaSparkContext jsc = UtilHelpers.buildSparkContext("clustering-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
        HoodieClusteringJob clusteringJob = new HoodieClusteringJob(jsc, cfg);
        int result = clusteringJob.cluster(cfg.retry);
        String resultMsg = String.format("Clustering with basePath: %s, tableName: %s, runSchedule: %s", cfg.basePath, cfg.tableName, cfg.runSchedule);
        if (result == -1) {
            LOG.error((Object)(resultMsg + " failed"));
        } else {
            LOG.info((Object)(resultMsg + " success"));
        }
        jsc.stop();
    }

    public int cluster(int retry) {
        this.fs = FSUtils.getFs(this.cfg.basePath, this.jsc.hadoopConfiguration());
        int ret = UtilHelpers.retry(retry, () -> {
            if (this.cfg.runSchedule.booleanValue()) {
                int result;
                LOG.info((Object)"Do schedule");
                Option<String> instantTime = this.doSchedule(this.jsc);
                int n = result = instantTime.isPresent() ? 0 : -1;
                if (result == 0) {
                    LOG.info((Object)("The schedule instant time is " + instantTime.get()));
                }
                return result;
            }
            LOG.info((Object)"Do cluster");
            return this.doCluster(this.jsc);
        }, "Cluster failed");
        return ret;
    }

    private String getSchemaFromLatestInstant() throws Exception {
        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.jsc.hadoopConfiguration(), this.cfg.basePath, true);
        TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
        if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
            throw new HoodieException("Cannot run clustering without any completed commits");
        }
        Schema schema = schemaUtil.getTableAvroSchema(false);
        return schema.toString();
    }

    private int doCluster(JavaSparkContext jsc) throws Exception {
        String schemaStr = this.getSchemaFromLatestInstant();
        SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);
        JavaRDD<WriteStatus> writeResponse = client.cluster(this.cfg.clusteringInstantTime, true).getWriteStatuses();
        return UtilHelpers.handleErrors(jsc, this.cfg.clusteringInstantTime, writeResponse);
    }

    public Option<String> doSchedule() throws Exception {
        return this.doSchedule(this.jsc);
    }

    private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
        String schemaStr = this.getSchemaFromLatestInstant();
        SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);
        return client.scheduleClustering(Option.empty());
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-sp"}, description="Base path for the table", required=true)
        public String basePath = null;
        @Parameter(names={"--table-name", "-tn"}, description="Table name", required=true)
        public String tableName = null;
        @Parameter(names={"--instant-time", "-it"}, description="Clustering Instant time, only need when cluster. And schedule clustering can generate it.", required=false)
        public String clusteringInstantTime = null;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for hoodie insert", required=false)
        public int parallelism = 1;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master", required=false)
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=true)
        public String sparkMemory = null;
        @Parameter(names={"--retry", "-rt"}, description="number of retries", required=false)
        public int retry = 0;
        @Parameter(names={"--schedule", "-sc"}, description="Schedule clustering")
        public Boolean runSchedule = false;
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client for clustering")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();
    }
}

