/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import io.hops.hudi.com.beust.jcommander.JCommander;
import io.hops.hudi.com.beust.jcommander.Parameter;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

public class HoodieSnapshotCopier
implements Serializable {
    private static final Logger LOG = LogManager.getLogger(HoodieSnapshotCopier.class);

    public void snapshot(JavaSparkContext jsc, String baseDir, String outputDir, boolean shouldAssumeDatePartitioning, boolean useFileListingFromMetadata) throws IOException {
        FileSystem fs = FSUtils.getFs(baseDir, jsc.hadoopConfiguration());
        SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration());
        HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir).build();
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        Option<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
        if (!latestCommit.isPresent()) {
            LOG.warn((Object)"No commits present. Nothing to snapshot");
            return;
        }
        String latestCommitTimestamp = latestCommit.get().getTimestamp();
        LOG.info((Object)String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp));
        List<String> partitions = FSUtils.getAllPartitionPaths((HoodieEngineContext)context, baseDir, useFileListingFromMetadata, shouldAssumeDatePartitioning);
        if (partitions.size() > 0) {
            FileStatus[] commitFilesToCopy;
            LOG.info((Object)String.format("The job needs to copy %d partitions.", partitions.size()));
            Path outputPath = new Path(outputDir);
            if (fs.exists(outputPath)) {
                LOG.warn((Object)String.format("The output path %s targetBasePath already exists, deleting", outputPath));
                fs.delete(new Path(outputDir), true);
            }
            context.setJobStatus(this.getClass().getSimpleName(), "Creating a snapshot: " + baseDir);
            List filesToCopy = context.flatMap(partitions, partition -> {
                FileSystem fs1 = FSUtils.getFs(baseDir, serConf.newCopy());
                ArrayList<Tuple2<String, String>> filePaths = new ArrayList<Tuple2<String, String>>();
                Stream<HoodieBaseFile> dataFiles = fsView.getLatestBaseFilesBeforeOrOn((String)partition, latestCommitTimestamp);
                dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<String, String>((String)partition, hoodieDataFile.getPath())));
                Path partitionMetaFile = HoodiePartitionMetadata.getPartitionMetafilePath(fs1, FSUtils.getPartitionPath(baseDir, partition)).get();
                if (fs1.exists(partitionMetaFile)) {
                    filePaths.add(new Tuple2<String, String>((String)partition, partitionMetaFile.toString()));
                }
                return filePaths.stream();
            }, partitions.size());
            context.foreach(filesToCopy, tuple -> {
                String partition = (String)tuple._1();
                Path sourceFilePath = new Path((String)tuple._2());
                Path toPartitionPath = FSUtils.getPartitionPath(outputDir, partition);
                FileSystem ifs = FSUtils.getFs(baseDir, serConf.newCopy());
                if (!ifs.exists(toPartitionPath)) {
                    ifs.mkdirs(toPartitionPath);
                }
                FileUtil.copy((FileSystem)ifs, (Path)sourceFilePath, (FileSystem)ifs, (Path)new Path(toPartitionPath, sourceFilePath.getName()), (boolean)false, (Configuration)ifs.getConf());
            }, filesToCopy.size());
            LOG.info((Object)String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
            for (FileStatus commitStatus : commitFilesToCopy = fs.listStatus(new Path(baseDir + "/" + ".hoodie"), commitFilePath -> {
                if (commitFilePath.getName().equals("hoodie.properties")) {
                    return true;
                }
                String instantTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
                return HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCommitTimestamp);
            })) {
                Path targetFilePath = new Path(outputDir + "/" + ".hoodie" + "/" + commitStatus.getPath().getName());
                if (!fs.exists(targetFilePath.getParent())) {
                    fs.mkdirs(targetFilePath.getParent());
                }
                if (fs.exists(targetFilePath)) {
                    LOG.error((Object)String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
                }
                FileUtil.copy((FileSystem)fs, (Path)commitStatus.getPath(), (FileSystem)fs, (Path)targetFilePath, (boolean)false, (Configuration)fs.getConf());
            }
        } else {
            LOG.info((Object)"The job has 0 partition to copy.");
        }
        Path successTagPath = new Path(outputDir + "/_SUCCESS");
        if (!fs.exists(successTagPath)) {
            LOG.info((Object)String.format("Creating _SUCCESS under targetBasePath: %s", outputDir));
            fs.createNewFile(successTagPath);
        }
    }

    public static void main(String[] args2) throws IOException {
        Config cfg = new Config();
        new JCommander((Object)cfg, null, args2);
        LOG.info((Object)String.format("Snapshot hoodie table from %s targetBasePath to %stargetBasePath", cfg.basePath, cfg.outputPath));
        SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-copier");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        LOG.info((Object)"Initializing spark job.");
        HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
        copier.snapshot(jsc, cfg.basePath, cfg.outputPath, cfg.shouldAssumeDatePartitioning, cfg.useFileListingFromMetadata);
        jsc.stop();
    }

    static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-bp"}, description="Hoodie table base path", required=true)
        String basePath = null;
        @Parameter(names={"--output-path", "-op"}, description="The snapshot output path", required=true)
        String outputPath = null;
        @Parameter(names={"--date-partitioned", "-dp"}, description="Can we assume date partitioning?")
        boolean shouldAssumeDatePartitioning = false;
        @Parameter(names={"--use-file-listing-from-metadata"}, description="Fetch file listing from Hudi's metadata")
        public Boolean useFileListingFromMetadata = false;

        Config() {
        }
    }
}

