package org.apache.hudi.utilities;

import io.hops.hudi.com.beust.jcommander.JCommander;
import io.hops.hudi.com.beust.jcommander.Parameter;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.repair.RepairUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;

/* loaded from: input_file:org/apache/hudi/utilities/HoodieRepairTool.class */
public class HoodieRepairTool {
    private static final Logger LOG = LogManager.getLogger(HoodieRepairTool.class);
    private static final String BACKUP_DIR_PREFIX = "hoodie_repair_backup_";
    private final Config cfg;
    private TypedProperties props;
    private final HoodieEngineContext context;
    private final HoodieTableMetaClient metaClient;
    private final HoodieTableMetadata tableMetadata;

    /* loaded from: input_file:org/apache/hudi/utilities/HoodieRepairTool$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
        public String basePath = null;

        @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"repair\" means repairing the table by removing dangling data and log files not belonging to any commit; Set \"dry_run\" means only looking for dangling data and log files; Set \"undo\" means undoing the repair by copying back the files from backup directory", required = true)
        public String runningMode = null;

        @Parameter(names = {"--start-instant-time", "-si"}, description = "Starting Instant time for repair (inclusive)", required = false)
        public String startingInstantTime = null;

        @Parameter(names = {"--end-instant-time", "-ei"}, description = "Ending Instant time for repair (inclusive)", required = false)
        public String endingInstantTime = null;

        @Parameter(names = {"--backup-path", "-bp"}, description = "Backup path for storing dangling data and log files from the table", required = false)
        public String backupPath = null;

        @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for repair", required = false)
        public int parallelism = 2;

        @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
        public String sparkMaster = null;

        @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false)
        public String sparkMemory = "1g";

        @Parameter(names = {"--assume-date-partitioning", "-dp"}, description = "whether the partition path is date with three levels", required = false)
        public Boolean assumeDatePartitioning = false;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client for table repair")
        public String propsFilePath = null;

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter = IdentitySplitter.class)
        public List<String> configs = new ArrayList();
    }

    /* loaded from: input_file:org/apache/hudi/utilities/HoodieRepairTool$Mode.class */
    public enum Mode {
        REPAIR,
        DRY_RUN,
        UNDO
    }

    public HoodieRepairTool(JavaSparkContext javaSparkContext, Config config) {
        if (config.propsFilePath != null) {
            config.propsFilePath = FSUtils.addSchemeIfLocalPath(config.propsFilePath).toString();
        }
        this.context = new HoodieSparkEngineContext(javaSparkContext);
        this.cfg = config;
        this.props = config.propsFilePath == null ? UtilHelpers.buildProperties(config.configs) : readConfigFromFileSystem(javaSparkContext, config);
        this.metaClient = HoodieTableMetaClient.builder().setConf(javaSparkContext.hadoopConfiguration()).setBasePath(config.basePath).setLoadActiveTimelineOnLoad(true).build();
        this.tableMetadata = new FileSystemBackedTableMetadata(this.context, this.context.getHadoopConf(), config.basePath, config.assumeDatePartitioning.booleanValue());
    }

    public boolean run() {
        Option<String> ofNullable = Option.ofNullable(this.cfg.startingInstantTime);
        Option<String> ofNullable2 = Option.ofNullable(this.cfg.endingInstantTime);
        if (ofNullable.isPresent() && ofNullable2.isPresent()) {
            LOG.info(String.format("Start repairing completed instants between %s and %s (inclusive)", ofNullable.get(), ofNullable2.get()));
        } else if (ofNullable.isPresent()) {
            LOG.info(String.format("Start repairing completed instants from %s (inclusive)", ofNullable.get()));
        } else if (ofNullable2.isPresent()) {
            LOG.info(String.format("Start repairing completed instants till %s (inclusive)", ofNullable2.get()));
        } else {
            LOG.info("Start repairing all completed instants");
        }
        try {
            switch (Mode.valueOf(this.cfg.runningMode.toUpperCase())) {
                case REPAIR:
                    LOG.info(" ****** The repair tool is in REPAIR mode, dangling data and logs files not belonging to any commit are going to be DELETED from the table ******");
                    if (checkBackupPathForRepair() >= 0) {
                        return doRepair(ofNullable, ofNullable2, false);
                    }
                    LOG.error("Backup path check failed.");
                    return false;
                case DRY_RUN:
                    LOG.info(" ****** The repair tool is in DRY_RUN mode, only LOOKING FOR dangling data and log files from the table ******");
                    return doRepair(ofNullable, ofNullable2, true);
                case UNDO:
                    if (checkBackupPathAgainstBasePath() >= 0) {
                        return undoRepair();
                    }
                    LOG.error("Backup path check failed.");
                    return false;
                default:
                    LOG.info("Unsupported running mode [" + this.cfg.runningMode + "], quit the job directly");
                    return false;
            }
        } catch (IOException e) {
            throw new HoodieIOException("Unable to repair table in " + this.cfg.basePath, e);
        }
    }

    public static void main(String[] strArr) {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        JavaSparkContext buildSparkContext = UtilHelpers.buildSparkContext("hudi-table-repair", config.sparkMaster, config.sparkMemory);
        try {
            try {
                new HoodieRepairTool(buildSparkContext, config).run();
                buildSparkContext.stop();
            } catch (Throwable th) {
                LOG.error("Fail to run table repair for " + config.basePath, th);
                buildSparkContext.stop();
            }
        } catch (Throwable th2) {
            buildSparkContext.stop();
            throw th2;
        }
    }

    static boolean copyFiles(HoodieEngineContext hoodieEngineContext, List<String> list, String str, String str2) {
        SerializableConfiguration hadoopConf = hoodieEngineContext.getHadoopConf();
        return ((Boolean) hoodieEngineContext.parallelize(list).mapPartitions(it -> {
            ArrayList arrayList = new ArrayList();
            FileSystem fs = FSUtils.getFs(str2, hadoopConf.get());
            it.forEachRemaining(str3 -> {
                boolean z = false;
                Path path = new Path(str, str3);
                Path path2 = new Path(str2, str3);
                try {
                    try {
                        if (!fs.exists(path2)) {
                            FileIOUtils.copy(fs, path, path2);
                            z = true;
                        }
                        arrayList.add(Boolean.valueOf(z));
                    } catch (IOException e) {
                        LOG.error(String.format("Copying file fails: source [%s], destination [%s]", path, path2));
                        arrayList.add(Boolean.valueOf(z));
                    }
                } catch (Throwable th) {
                    arrayList.add(Boolean.valueOf(z));
                    throw th;
                }
            });
            return arrayList.iterator();
        }, true).collectAsList().stream().reduce((bool, bool2) -> {
            return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
        }).orElse(false)).booleanValue();
    }

    static List<String> listFilesFromBasePath(HoodieEngineContext hoodieEngineContext, String str, int i, int i2) {
        return (List) FSUtils.getFileStatusAtLevel(hoodieEngineContext, FSUtils.getFs(str, hoodieEngineContext.getHadoopConf().get()), new Path(str), i, i2).stream().filter(fileStatus -> {
            if (fileStatus.isFile()) {
                return FSUtils.isDataFile(fileStatus.getPath());
            }
            return false;
        }).map(fileStatus2 -> {
            return fileStatus2.getPath().toString();
        }).collect(Collectors.toList());
    }

    static boolean deleteFiles(HoodieEngineContext hoodieEngineContext, String str, List<String> list) {
        SerializableConfiguration hadoopConf = hoodieEngineContext.getHadoopConf();
        return ((Boolean) hoodieEngineContext.parallelize(list).mapPartitions(it -> {
            FileSystem fs = FSUtils.getFs(str, hadoopConf.get());
            ArrayList arrayList = new ArrayList();
            it.forEachRemaining(str2 -> {
                boolean z = false;
                try {
                    try {
                        z = fs.delete(new Path(str, str2), false);
                        arrayList.add(Boolean.valueOf(z));
                    } catch (IOException e) {
                        LOG.warn("Failed to delete file " + str2);
                        arrayList.add(Boolean.valueOf(z));
                    }
                } catch (Throwable th) {
                    arrayList.add(Boolean.valueOf(z));
                    throw th;
                }
            });
            return arrayList.iterator();
        }, true).collectAsList().stream().reduce((bool, bool2) -> {
            return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
        }).orElse(true)).booleanValue();
    }

    boolean doRepair(Option<String> option, Option<String> option2, boolean z) throws IOException {
        Map tagInstantsOfBaseAndLogFiles = RepairUtils.tagInstantsOfBaseAndLogFiles(this.metaClient.getBasePath(), HoodieDataTableUtils.getBaseAndLogFilePathsFromFileSystem(this.tableMetadata, this.cfg.basePath));
        List<String> list = (List) tagInstantsOfBaseAndLogFiles.keySet().stream().filter(str -> {
            return (!option.isPresent() || str.compareTo((String) option.get()) >= 0) && (!option2.isPresent() || str.compareTo((String) option2.get()) <= 0);
        }).collect(Collectors.toList());
        HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
        HoodieArchivedTimeline archivedTimeline = this.metaClient.getArchivedTimeline();
        archivedTimeline.loadCompletedInstantDetailsInMemory();
        List<ImmutablePair<String, List<String>>> list2 = (List) this.context.parallelize(list).map(str2 -> {
            return new ImmutablePair(str2, RepairUtils.findInstantFilesToRemove(str2, (List) tagInstantsOfBaseAndLogFiles.get(str2), activeTimeline, archivedTimeline));
        }).collectAsList().stream().filter(immutablePair -> {
            return !((List) immutablePair.getValue()).isEmpty();
        }).collect(Collectors.toList());
        printRepairInfo(list, list2);
        if (z) {
            return true;
        }
        List<String> list3 = (List) list2.stream().flatMap(immutablePair2 -> {
            return ((List) immutablePair2.getValue()).stream();
        }).collect(Collectors.toList());
        if (list3.size() <= 0) {
            LOG.info(String.format("Table repair on %s is successful", this.cfg.basePath));
            return true;
        }
        if (backupFiles(list3)) {
            return deleteFiles(this.context, this.cfg.basePath, list3);
        }
        LOG.error("Error backing up dangling files. Exiting...");
        return false;
    }

    boolean undoRepair() throws IOException {
        HoodieWrapperFileSystem fs = this.metaClient.getFs();
        String str = this.cfg.backupPath;
        Path path = new Path(str);
        if (!fs.exists(path)) {
            LOG.error("Cannot find backup path: " + path);
            return false;
        }
        List<String> allPartitionPaths = this.tableMetadata.getAllPartitionPaths();
        if (allPartitionPaths.isEmpty()) {
            LOG.error("Cannot get one partition path since there is no partition available");
            return false;
        }
        return restoreFiles((List) listFilesFromBasePath(this.context, str, getExpectedLevelBasedOnPartitionPath(allPartitionPaths.get(0)), this.cfg.parallelism).stream().map(str2 -> {
            return FSUtils.getRelativePartitionPath(new Path(str), new Path(str2));
        }).collect(Collectors.toList()));
    }

    int getExpectedLevelBasedOnPartitionPath(String str) {
        if (StringUtils.isNullOrEmpty(str)) {
            return 0;
        }
        return str.split("/").length;
    }

    int checkBackupPathForRepair() throws IOException {
        if (this.cfg.backupPath == null) {
            this.cfg.backupPath = "/tmp/hoodie_repair_backup_" + new SecureRandom().nextLong();
        }
        Path path = new Path(this.cfg.backupPath);
        if (!this.metaClient.getFs().exists(path) || this.metaClient.getFs().listStatus(path).length <= 0) {
            return checkBackupPathAgainstBasePath();
        }
        LOG.error(String.format("Cannot use backup path %s: it is not empty", this.cfg.backupPath));
        return -1;
    }

    int checkBackupPathAgainstBasePath() {
        if (this.cfg.backupPath == null) {
            LOG.error("Backup path is not configured");
            return -1;
        }
        if (!this.cfg.backupPath.contains(this.cfg.basePath)) {
            return 0;
        }
        LOG.error(String.format("Cannot use backup path %s: it resides in the base path %s", this.cfg.backupPath, this.cfg.basePath));
        return -1;
    }

    boolean backupFiles(List<String> list) {
        return copyFiles(this.context, list, this.cfg.basePath, this.cfg.backupPath);
    }

    boolean restoreFiles(List<String> list) {
        return copyFiles(this.context, list, this.cfg.backupPath, this.cfg.basePath);
    }

    private void printRepairInfo(List<String> list, List<ImmutablePair<String, List<String>>> list2) {
        int size = list2.size();
        LOG.warn("Number of instants verified based on the base and log files: " + list.size());
        LOG.warn("Instant timestamps: " + list);
        LOG.warn("Number of instants to repair: " + size);
        if (size > 0) {
            list2.forEach(immutablePair -> {
                LOG.warn("   ** Removing files: " + immutablePair.getValue());
            });
        }
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext javaSparkContext, Config config) {
        return UtilHelpers.readConfig(javaSparkContext.hadoopConfiguration(), new Path(config.propsFilePath), config.configs).getProps(true);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1750545829:
                if (implMethodName.equals("lambda$doRepair$a6a9b884$1")) {
                    z = true;
                    break;
                }
                break;
            case -638424493:
                if (implMethodName.equals("lambda$deleteFiles$8c101844$1")) {
                    z = false;
                    break;
                }
                break;
            case -410865916:
                if (implMethodName.equals("lambda$copyFiles$b539b2f5$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/hudi/common/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/HoodieRepairTool") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/hudi/common/config/SerializableConfiguration;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    SerializableConfiguration serializableConfiguration = (SerializableConfiguration) serializedLambda.getCapturedArg(1);
                    return it -> {
                        FileSystem fs = FSUtils.getFs(str, serializableConfiguration.get());
                        List arrayList = new ArrayList();
                        it.forEachRemaining(str2 -> {
                            boolean z2 = false;
                            try {
                                try {
                                    z2 = fs.delete(new Path(str, str2), false);
                                    arrayList.add(Boolean.valueOf(z2));
                                } catch (IOException e) {
                                    LOG.warn("Failed to delete file " + str2);
                                    arrayList.add(Boolean.valueOf(z2));
                                }
                            } catch (Throwable th) {
                                arrayList.add(Boolean.valueOf(z2));
                                throw th;
                            }
                        });
                        return arrayList.iterator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/hudi/common/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/HoodieRepairTool") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;Lorg/apache/hudi/common/table/timeline/HoodieActiveTimeline;Lorg/apache/hudi/common/table/timeline/HoodieArchivedTimeline;Ljava/lang/String;)Lorg/apache/hudi/common/util/collection/ImmutablePair;")) {
                    Map map = (Map) serializedLambda.getCapturedArg(0);
                    HoodieActiveTimeline hoodieActiveTimeline = (HoodieActiveTimeline) serializedLambda.getCapturedArg(1);
                    HoodieArchivedTimeline hoodieArchivedTimeline = (HoodieArchivedTimeline) serializedLambda.getCapturedArg(2);
                    return str2 -> {
                        return new ImmutablePair(str2, RepairUtils.findInstantFilesToRemove(str2, (List) map.get(str2), hoodieActiveTimeline, hoodieArchivedTimeline));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/hudi/common/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/HoodieRepairTool") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/hudi/common/config/SerializableConfiguration;Ljava/lang/String;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    String str3 = (String) serializedLambda.getCapturedArg(0);
                    SerializableConfiguration serializableConfiguration2 = (SerializableConfiguration) serializedLambda.getCapturedArg(1);
                    String str4 = (String) serializedLambda.getCapturedArg(2);
                    return it2 -> {
                        List arrayList = new ArrayList();
                        FileSystem fs = FSUtils.getFs(str3, serializableConfiguration2.get());
                        it2.forEachRemaining(str32 -> {
                            boolean z2 = false;
                            Path path = new Path(str4, str32);
                            Path path2 = new Path(str3, str32);
                            try {
                                try {
                                    if (!fs.exists(path2)) {
                                        FileIOUtils.copy(fs, path, path2);
                                        z2 = true;
                                    }
                                    arrayList.add(Boolean.valueOf(z2));
                                } catch (IOException e) {
                                    LOG.error(String.format("Copying file fails: source [%s], destination [%s]", path, path2));
                                    arrayList.add(Boolean.valueOf(z2));
                                }
                            } catch (Throwable th) {
                                arrayList.add(Boolean.valueOf(z2));
                                throw th;
                            }
                        });
                        return arrayList.iterator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
