/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiPredicate;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

public class HoodieSnapshotCopier
implements Serializable {
    private static final Logger LOG = LogManager.getLogger(HoodieSnapshotCopier.class);

    public void snapshot(JavaSparkContext jsc, String baseDir, String outputDir, boolean shouldAssumeDatePartitioning, boolean useFileListingFromMetadata) throws IOException {
        FileSystem fs = FSUtils.getFs((String)baseDir, (Configuration)jsc.hadoopConfiguration());
        SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration());
        HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir).build();
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        Option latestCommit = tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
        if (!latestCommit.isPresent()) {
            LOG.warn((Object)"No commits present. Nothing to snapshot");
            return;
        }
        String latestCommitTimestamp = ((HoodieInstant)latestCommit.get()).getTimestamp();
        LOG.info((Object)String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp));
        List partitions = FSUtils.getAllPartitionPaths((HoodieEngineContext)context, (String)baseDir, (boolean)useFileListingFromMetadata, (boolean)shouldAssumeDatePartitioning);
        if (partitions.size() > 0) {
            FileStatus[] commitFilesToCopy;
            LOG.info((Object)String.format("The job needs to copy %d partitions.", partitions.size()));
            Path outputPath = new Path(outputDir);
            if (fs.exists(outputPath)) {
                LOG.warn((Object)String.format("The output path %s targetBasePath already exists, deleting", outputPath));
                fs.delete(new Path(outputDir), true);
            }
            context.setJobStatus(this.getClass().getSimpleName(), "Creating a snapshot");
            List filesToCopy = context.flatMap(partitions, arg_0 -> HoodieSnapshotCopier.lambda$snapshot$5b911e43$1(baseDir, serConf, (TableFileSystemView.BaseFileOnlyView)fsView, latestCommitTimestamp, arg_0), partitions.size());
            context.foreach(filesToCopy, (SerializableConsumer & Serializable)tuple -> {
                String partition = (String)tuple._1();
                Path sourceFilePath = new Path((String)tuple._2());
                Path toPartitionPath = FSUtils.getPartitionPath((String)outputDir, (String)partition);
                FileSystem ifs = FSUtils.getFs((String)baseDir, (Configuration)serConf.newCopy());
                if (!ifs.exists(toPartitionPath)) {
                    ifs.mkdirs(toPartitionPath);
                }
                FileUtil.copy((FileSystem)ifs, (Path)sourceFilePath, (FileSystem)ifs, (Path)new Path(toPartitionPath, sourceFilePath.getName()), (boolean)false, (Configuration)ifs.getConf());
            }, filesToCopy.size());
            LOG.info((Object)String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
            for (FileStatus commitStatus : commitFilesToCopy = fs.listStatus(new Path(baseDir + "/" + ".hoodie"), commitFilePath -> {
                if (commitFilePath.getName().equals("hoodie.properties")) {
                    return true;
                }
                String instantTime = FSUtils.getCommitFromCommitFile((String)commitFilePath.getName());
                return HoodieTimeline.compareTimestamps((String)instantTime, (BiPredicate)HoodieTimeline.LESSER_THAN_OR_EQUALS, (String)latestCommitTimestamp);
            })) {
                Path targetFilePath = new Path(outputDir + "/" + ".hoodie" + "/" + commitStatus.getPath().getName());
                if (!fs.exists(targetFilePath.getParent())) {
                    fs.mkdirs(targetFilePath.getParent());
                }
                if (fs.exists(targetFilePath)) {
                    LOG.error((Object)String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
                }
                FileUtil.copy((FileSystem)fs, (Path)commitStatus.getPath(), (FileSystem)fs, (Path)targetFilePath, (boolean)false, (Configuration)fs.getConf());
            }
        } else {
            LOG.info((Object)"The job has 0 partition to copy.");
        }
        Path successTagPath = new Path(outputDir + "/_SUCCESS");
        if (!fs.exists(successTagPath)) {
            LOG.info((Object)String.format("Creating _SUCCESS under targetBasePath: %s", outputDir));
            fs.createNewFile(successTagPath);
        }
    }

    public static void main(String[] args) throws IOException {
        Config cfg = new Config();
        new JCommander((Object)cfg, null, args);
        LOG.info((Object)String.format("Snapshot hoodie table from %s targetBasePath to %stargetBasePath", cfg.basePath, cfg.outputPath));
        SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-copier");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        LOG.info((Object)"Initializing spark job.");
        HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
        copier.snapshot(jsc, cfg.basePath, cfg.outputPath, cfg.shouldAssumeDatePartitioning, cfg.useFileListingFromMetadata);
        jsc.stop();
    }

    private static /* synthetic */ Stream lambda$snapshot$5b911e43$1(String baseDir, SerializableConfiguration serConf, TableFileSystemView.BaseFileOnlyView fsView, String latestCommitTimestamp, String partition) throws Exception {
        FileSystem fs1 = FSUtils.getFs((String)baseDir, (Configuration)serConf.newCopy());
        ArrayList<Tuple2> filePaths = new ArrayList<Tuple2>();
        Stream dataFiles = fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp);
        dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2((Object)partition, (Object)hoodieDataFile.getPath())));
        Path partitionMetaFile = new Path(FSUtils.getPartitionPath((String)baseDir, (String)partition), ".hoodie_partition_metadata");
        if (fs1.exists(partitionMetaFile)) {
            filePaths.add(new Tuple2((Object)partition, (Object)partitionMetaFile.toString()));
        }
        return filePaths.stream();
    }

    static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-bp"}, description="Hoodie table base path", required=true)
        String basePath = null;
        @Parameter(names={"--output-path", "-op"}, description="The snapshot output path", required=true)
        String outputPath = null;
        @Parameter(names={"--date-partitioned", "-dp"}, description="Can we assume date partitioning?")
        boolean shouldAssumeDatePartitioning = false;
        @Parameter(names={"--use-file-listing-from-metadata"}, description="Fetch file listing from Hudi's metadata")
        public Boolean useFileListingFromMetadata = false;

        Config() {
        }
    }
}

