/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import io.hops.hudi.com.beust.jcommander.IValueValidator;
import io.hops.hudi.com.beust.jcommander.JCommander;
import io.hops.hudi.com.beust.jcommander.Parameter;
import io.hops.hudi.com.beust.jcommander.ParameterException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import scala.Tuple2;
import scala.collection.JavaConversions;

public class HoodieSnapshotExporter {
    private static final Logger LOG = LogManager.getLogger(HoodieSnapshotExporter.class);

    public void export(JavaSparkContext jsc, Config cfg) throws IOException {
        FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
        HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
        if (this.outputPathExists(fs, cfg)) {
            throw new HoodieSnapshotExporterException("The target output path already exists.");
        }
        String latestCommitTimestamp = this.getLatestCommitTimestamp(fs, cfg).orElseThrow(() -> {
            throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
        });
        LOG.info((Object)String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp));
        List<String> partitions = this.getPartitions(engineContext, cfg);
        if (partitions.isEmpty()) {
            throw new HoodieSnapshotExporterException("The source dataset has 0 partition to snapshot.");
        }
        LOG.info((Object)String.format("The job needs to export %d partitions.", partitions.size()));
        if (cfg.outputFormat.equals("hudi")) {
            this.exportAsHudi(jsc, cfg, partitions, latestCommitTimestamp);
        } else {
            this.exportAsNonHudi(jsc, cfg, partitions, latestCommitTimestamp);
        }
        this.createSuccessTag(fs, cfg);
    }

    private boolean outputPathExists(FileSystem fs, Config cfg) throws IOException {
        return fs.exists(new Path(cfg.targetOutputPath));
    }

    private Option<String> getLatestCommitTimestamp(FileSystem fs, Config cfg) {
        HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
        Option<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant();
        return latestCommit.isPresent() ? Option.of(latestCommit.get().getTimestamp()) : Option.empty();
    }

    private List<String> getPartitions(HoodieEngineContext engineContext, Config cfg) {
        return FSUtils.getAllPartitionPaths(engineContext, cfg.sourceBasePath, true, false, false);
    }

    private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
        Path successTagPath = new Path(cfg.targetOutputPath + "/_SUCCESS");
        if (!fs.exists(successTagPath)) {
            LOG.info((Object)String.format("Creating _SUCCESS under target output path: %s", cfg.targetOutputPath));
            fs.createNewFile(successTagPath);
        }
    }

    private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) {
        Partitioner defaultPartitioner = dataset -> {
            Dataset hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
            return StringUtils.isNullOrEmpty(cfg.outputPartitionField) ? hoodieDroppedDataset.write() : hoodieDroppedDataset.repartition(new Column[]{new Column(cfg.outputPartitionField)}).write().partitionBy(new String[]{cfg.outputPartitionField});
        };
        Partitioner partitioner = StringUtils.isNullOrEmpty(cfg.outputPartitioner) ? defaultPartitioner : (Partitioner)ReflectionUtils.loadClass(cfg.outputPartitioner);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        ((HoodieEngineContext)context).setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset");
        TableFileSystemView.BaseFileOnlyView fsView = this.getBaseFileOnlyView(jsc, cfg);
        Iterator exportingFilePaths = jsc.parallelize(partitions, partitions.size()).flatMap((FlatMapFunction & Serializable)partition -> fsView.getLatestBaseFilesBeforeOrOn((String)partition, latestCommitTimestamp).map(BaseFile::getPath).iterator()).toLocalIterator();
        Dataset sourceDataset = new SQLContext(jsc).read().parquet(JavaConversions.asScalaIterator((Iterator)exportingFilePaths).toSeq());
        partitioner.partition((Dataset<Row>)sourceDataset).format(cfg.outputFormat).mode(SaveMode.Overwrite).save(cfg.targetOutputPath);
    }

    private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
        FileStatus[] commitFilesToCopy;
        TableFileSystemView.BaseFileOnlyView fsView = this.getBaseFileOnlyView(jsc, cfg);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        SerializableConfiguration serConf = context.getHadoopConf();
        ((HoodieEngineContext)context).setJobStatus(this.getClass().getSimpleName(), "Exporting as HUDI dataset");
        List files = ((HoodieEngineContext)context).flatMap(partitions, partition -> {
            ArrayList<Tuple2> filePaths = new ArrayList<Tuple2>();
            Stream<HoodieBaseFile> dataFiles = fsView.getLatestBaseFilesBeforeOrOn((String)partition, latestCommitTimestamp);
            dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2(partition, (Object)hoodieDataFile.getPath())));
            Path partitionMetaFile = new Path(FSUtils.getPartitionPath(cfg.sourceBasePath, partition), ".hoodie_partition_metadata");
            FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
            if (fs.exists(partitionMetaFile)) {
                filePaths.add(new Tuple2(partition, (Object)partitionMetaFile.toString()));
            }
            return filePaths.stream();
        }, partitions.size());
        ((HoodieEngineContext)context).foreach(files, tuple -> {
            String partition = (String)tuple._1();
            Path sourceFilePath = new Path((String)tuple._2());
            Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition);
            FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
            if (!fs.exists(toPartitionPath)) {
                fs.mkdirs(toPartitionPath);
            }
            FileUtil.copy((FileSystem)fs, (Path)sourceFilePath, (FileSystem)fs, (Path)new Path(toPartitionPath, sourceFilePath.getName()), (boolean)false, (Configuration)fs.getConf());
        }, files.size());
        LOG.info((Object)String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
        FileSystem fileSystem = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
        for (FileStatus commitStatus : commitFilesToCopy = fileSystem.listStatus(new Path(cfg.sourceBasePath + "/" + ".hoodie"), commitFilePath -> {
            if (commitFilePath.getName().equals("hoodie.properties")) {
                return true;
            }
            String instantTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
            return HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCommitTimestamp);
        })) {
            Path targetFilePath = new Path(cfg.targetOutputPath + "/" + ".hoodie" + "/" + commitStatus.getPath().getName());
            if (!fileSystem.exists(targetFilePath.getParent())) {
                fileSystem.mkdirs(targetFilePath.getParent());
            }
            if (fileSystem.exists(targetFilePath)) {
                LOG.error((Object)String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
            }
            FileUtil.copy((FileSystem)fileSystem, (Path)commitStatus.getPath(), (FileSystem)fileSystem, (Path)targetFilePath, (boolean)false, (Configuration)fileSystem.getConf());
        }
    }

    private TableFileSystemView.BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config cfg) {
        FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
        HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
        return new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws IOException {
        Config cfg = new Config();
        new JCommander((Object)cfg, null, args);
        SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-exporter");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        LOG.info((Object)"Initializing spark job.");
        try {
            new HoodieSnapshotExporter().export(jsc, cfg);
        }
        finally {
            jsc.stop();
        }
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--source-base-path"}, description="Base path for the source Hudi dataset to be snapshotted", required=true)
        public String sourceBasePath;
        @Parameter(names={"--target-output-path"}, description="Base path for the target output files (snapshots)", required=true)
        public String targetOutputPath;
        @Parameter(names={"--output-format"}, description="Output format for the exported dataset; accept these values: json|parquet|hudi", required=true, validateValueWith={OutputFormatValidator.class})
        public String outputFormat;
        @Parameter(names={"--output-partition-field"}, description="A field to be used by Spark repartitioning")
        public String outputPartitionField = null;
        @Parameter(names={"--output-partitioner"}, description="A class to facilitate custom repartitioning")
        public String outputPartitioner = null;
    }

    public static class OutputFormatValidator
    implements IValueValidator<String> {
        public static final String HUDI = "hudi";
        public static final List<String> FORMATS = CollectionUtils.createImmutableList("json", "parquet", "hudi");

        @Override
        public void validate(String name, String value) {
            if (value == null || !FORMATS.contains(value)) {
                throw new ParameterException(String.format("Invalid output format: value:%s: supported formats:%s", value, FORMATS));
            }
        }
    }

    @FunctionalInterface
    public static interface Partitioner {
        public DataFrameWriter<Row> partition(Dataset<Row> var1);
    }
}

