/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import io.hops.hudi.com.beust.jcommander.JCommander;
import io.hops.hudi.com.beust.jcommander.Parameter;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.util.BloomFilterData;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class HoodieMetadataTableValidator
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieMetadataTableValidator.class);
    private transient JavaSparkContext jsc;
    private Config cfg;
    private TypedProperties props;
    private final HoodieTableMetaClient metaClient;
    protected transient Option<AsyncMetadataTableValidateService> asyncMetadataTableValidateService;
    private final String taskLabels;

    public HoodieMetadataTableValidator(JavaSparkContext jsc, Config cfg) {
        this.jsc = jsc;
        this.cfg = cfg;
        this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : this.readConfigFromFileSystem(jsc, cfg);
        this.metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
        this.asyncMetadataTableValidateService = cfg.continuous ? Option.of(new AsyncMetadataTableValidateService()) : Option.empty();
        this.taskLabels = this.generateValidationTaskLabels();
    }

    private String generateValidationTaskLabels() {
        ArrayList<String> labelList = new ArrayList<String>();
        if (this.cfg.validateLatestBaseFiles) {
            labelList.add("validate-latest-base-files");
        }
        if (this.cfg.validateLatestFileSlices) {
            labelList.add("validate-latest-file-slices");
        }
        if (this.cfg.validateAllFileGroups) {
            labelList.add("validate-all-file-groups");
        }
        if (this.cfg.validateAllColumnStats) {
            labelList.add("validate-all-column-stats");
        }
        if (this.cfg.validateBloomFilters) {
            labelList.add("validate-bloom-filters");
        }
        if (this.cfg.validateRecordIndexCount) {
            labelList.add("validate-record-index-count");
        }
        if (this.cfg.validateRecordIndexContent) {
            labelList.add("validate-record-index-content");
        }
        return String.join((CharSequence)",", labelList);
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
        return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args2) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args2);
        if (cfg.help.booleanValue() || args2.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        SparkConf sparkConf = UtilHelpers.buildSparkConf("Hoodie-Metadata-Table-Validator", cfg.sparkMaster);
        sparkConf.set("spark.executor.memory", cfg.sparkMemory);
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        try {
            HoodieMetadataTableValidator validator = new HoodieMetadataTableValidator(jsc, cfg);
            validator.run();
        }
        catch (TableNotFoundException e) {
            LOG.warn(String.format("The Hudi data table is not found: [%s]. Skipping the validation of the metadata table.", cfg.basePath), (Throwable)e);
        }
        catch (Throwable throwable) {
            LOG.error("Fail to do hoodie metadata table validation for " + cfg, throwable);
        }
        finally {
            jsc.stop();
        }
    }

    public boolean run() {
        boolean result = false;
        try {
            LOG.info(this.cfg.toString());
            if (this.cfg.continuous) {
                LOG.info(" ****** do hoodie metadata table validation in CONTINUOUS mode ******");
                this.doHoodieMetadataTableValidationContinuous();
            }
            LOG.info(" ****** do hoodie metadata table validation once ******");
            result = this.doHoodieMetadataTableValidationOnce();
            return result;
        }
        catch (Exception e) {
            throw new HoodieException("Unable to do hoodie metadata table validation in " + this.cfg.basePath, e);
        }
        finally {
            if (this.asyncMetadataTableValidateService.isPresent()) {
                this.asyncMetadataTableValidateService.get().shutdown(true);
            }
            return result;
        }
    }

    private boolean doHoodieMetadataTableValidationOnce() {
        try {
            return this.doMetadataTableValidation();
        }
        catch (Throwable e) {
            LOG.error("Metadata table validation failed to HoodieValidationException", e);
            if (!this.cfg.ignoreFailed) {
                throw e;
            }
            return false;
        }
    }

    private void doHoodieMetadataTableValidationContinuous() {
        this.asyncMetadataTableValidateService.ifPresent(service -> {
            service.start(null);
            try {
                service.waitForShutdown();
            }
            catch (Exception e) {
                throw new HoodieException(e.getMessage(), e);
            }
        });
    }

    /*
     * Exception decompiling
     */
    public boolean doMetadataTableValidation() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private boolean checkMetadataTableIsAvailable() {
        try {
            HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder().setConf(this.jsc.hadoopConfiguration()).setBasePath(new Path(this.cfg.basePath, ".hoodie/metadata").toString()).setLoadActiveTimelineOnLoad(true).build();
            int finishedInstants = mdtMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
            if (finishedInstants == 0) {
                if (this.metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
                    LOG.info("There is no completed commit in both metadata table and corresponding data table.");
                    return false;
                }
                throw new HoodieValidationException("There is no completed instant for metadata table.");
            }
            return true;
        }
        catch (TableNotFoundException tbe) {
            LOG.warn("Metadata table is not found. Skip current validation.");
            return false;
        }
        catch (Exception ex) {
            LOG.warn("Metadata table is not available to read for now, ", (Throwable)ex);
            return false;
        }
    }

    private List<String> validatePartitions(HoodieSparkEngineContext engineContext, String basePath) {
        List<String> allPartitionPathsFromFS = FSUtils.getAllPartitionPaths(engineContext, basePath, false, this.cfg.assumeDatePartitioning);
        HoodieTimeline completedTimeline = this.metaClient.getCommitsTimeline().filterCompletedInstants();
        allPartitionPathsFromFS = ((Stream)allPartitionPathsFromFS.stream().parallel()).filter(part -> {
            HoodiePartitionMetadata hoodiePartitionMetadata = new HoodiePartitionMetadata(this.metaClient.getFs(), FSUtils.getPartitionPath(basePath, part));
            Option<String> instantOption = hoodiePartitionMetadata.readPartitionCreatedCommitTime();
            if (instantOption.isPresent()) {
                String instantTime = instantOption.get();
                if (!completedTimeline.containsOrBeforeTimelineStarts(instantTime)) {
                    Option<HoodieInstant> lastInstant = completedTimeline.lastInstant();
                    return lastInstant.isPresent() && HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp());
                }
                return true;
            }
            return false;
        }).collect(Collectors.toList());
        List<String> allPartitionPathsMeta = FSUtils.getAllPartitionPaths(engineContext, basePath, true, this.cfg.assumeDatePartitioning);
        Collections.sort(allPartitionPathsFromFS);
        Collections.sort(allPartitionPathsMeta);
        if (allPartitionPathsFromFS.size() != allPartitionPathsMeta.size() || !allPartitionPathsFromFS.equals(allPartitionPathsMeta)) {
            String message = "Compare Partitions Failed! AllPartitionPathsFromFS : " + allPartitionPathsFromFS + " and allPartitionPathsMeta : " + allPartitionPathsMeta;
            LOG.error(message);
            throw new HoodieValidationException(message);
        }
        return allPartitionPathsMeta;
    }

    private void validateFilesInPartition(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        if (this.cfg.validateLatestFileSlices) {
            this.validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateLatestBaseFiles) {
            this.validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateAllFileGroups) {
            this.validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateAllColumnStats) {
            this.validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateBloomFilters) {
            this.validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
    }

    private void validateAllFileGroups(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<FileSlice> allFileSlicesFromFS;
        List<FileSlice> allFileSlicesFromMeta;
        if (!baseDataFilesForCleaning.isEmpty()) {
            List<FileSlice> fileSlicesFromMeta = metadataTableBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
            List<FileSlice> fileSlicesFromFS = fsBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
            allFileSlicesFromMeta = this.filterFileSliceBasedOnInflightCleaning(fileSlicesFromMeta, baseDataFilesForCleaning);
            allFileSlicesFromFS = this.filterFileSliceBasedOnInflightCleaning(fileSlicesFromFS, baseDataFilesForCleaning);
        } else {
            allFileSlicesFromMeta = metadataTableBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
            allFileSlicesFromFS = fsBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
        }
        LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath);
        LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath);
        this.validateFileSlices(allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath, fsBasedContext.getMetaClient(), "all file groups");
    }

    private void validateLatestBaseFiles(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<HoodieBaseFile> latestFilesFromFS;
        List<HoodieBaseFile> latestFilesFromMetadata;
        if (!baseDataFilesForCleaning.isEmpty()) {
            latestFilesFromMetadata = this.filterBaseFileBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
            latestFilesFromFS = this.filterBaseFileBasedOnInflightCleaning(fsBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
        } else {
            latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath);
            latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
        }
        LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath);
        LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath);
        this.validate(latestFilesFromMetadata, latestFilesFromFS, partitionPath, "latest base files");
    }

    private void validateLatestFileSlices(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<FileSlice> latestFileSlicesFromFS;
        List<FileSlice> latestFileSlicesFromMetadataTable;
        if (!baseDataFilesForCleaning.isEmpty()) {
            latestFileSlicesFromMetadataTable = this.filterFileSliceBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
            latestFileSlicesFromFS = this.filterFileSliceBasedOnInflightCleaning(fsBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
        } else {
            latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
            latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
        }
        LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
        LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);
        this.validateFileSlices(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, fsBasedContext.getMetaClient(), "latest file slices");
    }

    private List<FileSlice> filterFileSliceBasedOnInflightCleaning(List<FileSlice> sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
        return sortedLatestFileSliceList.stream().filter(fileSlice -> {
            if (!fileSlice.getBaseFile().isPresent()) {
                return true;
            }
            return !baseDataFilesForCleaning.contains(fileSlice.getBaseFile().get().getFileName());
        }).collect(Collectors.toList());
    }

    private List<HoodieBaseFile> filterBaseFileBasedOnInflightCleaning(List<HoodieBaseFile> sortedBaseFileList, Set<String> baseDataFilesForCleaning) {
        return sortedBaseFileList.stream().filter(baseFile -> !baseDataFilesForCleaning.contains(baseFile.getFileName())).collect(Collectors.toList());
    }

    private void validateAllColumnStats(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<String> latestBaseFilenameList = this.getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
        List<HoodieColumnRangeMetadata<Comparable>> metadataBasedColStats = metadataTableBasedContext.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
        List<HoodieColumnRangeMetadata<Comparable>> fsBasedColStats = fsBasedContext.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
        this.validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column stats");
    }

    private void validateBloomFilters(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<String> latestBaseFilenameList = this.getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
        List<BloomFilterData> metadataBasedBloomFilters = metadataTableBasedContext.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
        List<BloomFilterData> fsBasedBloomFilters = fsBasedContext.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
        this.validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, "bloom filters");
    }

    private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext, HoodieTableMetaClient metaClient, HoodieTableMetadata tableMetadata) {
        if (this.cfg.validateRecordIndexContent) {
            this.validateRecordIndexContent(sparkEngineContext, metaClient, tableMetadata);
        } else if (this.cfg.validateRecordIndexCount) {
            this.validateRecordIndexCount(sparkEngineContext, metaClient);
        }
    }

    private void validateRecordIndexCount(HoodieSparkEngineContext sparkEngineContext, HoodieTableMetaClient metaClient) {
        long countKeyFromRecordIndex;
        String basePath = metaClient.getBasePathV2().toString();
        long countKeyFromTable = sparkEngineContext.getSqlContext().read().format("hudi").load(basePath).select(HoodieRecord.RECORD_KEY_METADATA_FIELD, new String[0]).count();
        if (countKeyFromTable != (countKeyFromRecordIndex = sparkEngineContext.getSqlContext().read().format("hudi").load(HoodieTableMetadata.getMetadataTableBasePath(basePath)).select("key", new String[0]).filter("type = 5").count())) {
            String message = String.format("Validation of record index count failed: %s entries from record index metadata, %s keys from the data table.", countKeyFromRecordIndex, countKeyFromTable);
            LOG.error(message);
            throw new HoodieValidationException(message);
        }
        LOG.info(String.format("Validation of record index count succeeded: %s entries.", countKeyFromRecordIndex));
    }

    private void validateRecordIndexContent(HoodieSparkEngineContext sparkEngineContext, HoodieTableMetaClient metaClient, HoodieTableMetadata tableMetadata) {
        String basePath = metaClient.getBasePathV2().toString();
        JavaPairRDD keyToLocationOnFsRdd = sparkEngineContext.getSqlContext().read().format("hudi").load(basePath).select(HoodieRecord.RECORD_KEY_METADATA_FIELD, new String[]{HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD}).toJavaRDD().mapToPair((PairFunction & Serializable)row -> new Tuple2((Object)row.getString(row.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)), Pair.of(row.getString(row.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)), FSUtils.getFileId(row.getString(row.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)))))).cache();
        JavaPairRDD keyToLocationFromRecordIndexRdd = sparkEngineContext.getSqlContext().read().format("hudi").load(HoodieTableMetadata.getMetadataTableBasePath(basePath)).filter("type = 5").select(new Column[]{functions.col((String)"key"), functions.col((String)"recordIndexMetadata.partitionName").as("partitionName"), functions.col((String)"recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"), functions.col((String)"recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"), functions.col((String)"recordIndexMetadata.fileIndex").as("fileIndex"), functions.col((String)"recordIndexMetadata.fileId").as("fileId"), functions.col((String)"recordIndexMetadata.instantTime").as("instantTime"), functions.col((String)"recordIndexMetadata.fileIdEncoding").as("fileIdEncoding")}).toJavaRDD().mapToPair((PairFunction & Serializable)row -> {
            HoodieRecordGlobalLocation location = HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(row.getString(row.fieldIndex("partitionName")), row.getInt(row.fieldIndex("fileIdEncoding")), row.getLong(row.fieldIndex("fileIdHighBits")), row.getLong(row.fieldIndex("fileIdLowBits")), row.getInt(row.fieldIndex("fileIndex")), row.getString(row.fieldIndex("fileId")), row.getLong(row.fieldIndex("instantTime")));
            return new Tuple2((Object)row.getString(row.fieldIndex("key")), Pair.of(location.getPartitionPath(), location.getFileId()));
        });
        int numErrorSamples = this.cfg.numRecordIndexErrorSamples;
        Pair result = (Pair)keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, this.cfg.recordIndexParallelism).map((Function & Serializable)e -> {
            Optional locationOnFs = (Optional)((Tuple2)e._2)._1;
            Optional locationFromRecordIndex = (Optional)((Tuple2)e._2)._2;
            StringBuilder sb = new StringBuilder();
            ArrayList<String> errorSampleList = new ArrayList<String>();
            if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent()) {
                if (((String)((Pair)locationOnFs.get()).getLeft()).equals(((Pair)locationFromRecordIndex.get()).getLeft()) && ((String)((Pair)locationOnFs.get()).getRight()).equals(((Pair)locationFromRecordIndex.get()).getRight())) {
                    return Pair.of(0L, errorSampleList);
                }
                errorSampleList.add(this.constructLocationInfoString((Optional<Pair<String, String>>)locationOnFs, (Optional<Pair<String, String>>)locationFromRecordIndex));
                return Pair.of(1L, errorSampleList);
            }
            if (!locationOnFs.isPresent() && !locationFromRecordIndex.isPresent()) {
                return Pair.of(0L, errorSampleList);
            }
            errorSampleList.add(this.constructLocationInfoString((Optional<Pair<String, String>>)locationOnFs, (Optional<Pair<String, String>>)locationFromRecordIndex));
            return Pair.of(1L, errorSampleList);
        }).reduce((Function2 & Serializable)(pair1, pair2) -> {
            long errorCount = (Long)pair1.getLeft() + (Long)pair2.getLeft();
            List list1 = (List)pair1.getRight();
            List list2 = (List)pair2.getRight();
            if (!list1.isEmpty() && !list2.isEmpty()) {
                if (list1.size() >= numErrorSamples) {
                    return Pair.of(errorCount, list1);
                }
                if (list2.size() >= numErrorSamples) {
                    return Pair.of(errorCount, list2);
                }
                ArrayList<String> resultList = new ArrayList<String>();
                if (list1.size() > list2.size()) {
                    resultList.addAll(list1);
                    for (String item : list2) {
                        resultList.add(item);
                        if (resultList.size() < numErrorSamples) continue;
                        break;
                    }
                } else {
                    resultList.addAll(list2);
                    for (String item : list1) {
                        resultList.add(item);
                        if (resultList.size() < numErrorSamples) continue;
                        break;
                    }
                }
                return Pair.of(errorCount, resultList);
            }
            if (!list1.isEmpty()) {
                return Pair.of(errorCount, list1);
            }
            return Pair.of(errorCount, list2);
        });
        long countKey = keyToLocationOnFsRdd.count();
        keyToLocationOnFsRdd.unpersist();
        long diffCount = (Long)result.getLeft();
        if (diffCount > 0L) {
            String message = String.format("Validation of record index content failed: %s keys (total %s) from the data table have wrong location in record index metadata. Sample mismatches: %s", diffCount, countKey, String.join((CharSequence)";", (Iterable)result.getRight()));
            LOG.error(message);
            throw new HoodieValidationException(message);
        }
        LOG.info(String.format("Validation of record index content succeeded: %s entries.", countKey));
    }

    private String constructLocationInfoString(Optional<Pair<String, String>> locationOnFs, Optional<Pair<String, String>> locationFromRecordIndex) {
        StringBuilder sb = new StringBuilder();
        sb.append("FS: ");
        if (locationOnFs.isPresent()) {
            sb.append(locationOnFs.get());
        } else {
            sb.append("<empty>");
        }
        sb.append(", Record Index: ");
        if (locationFromRecordIndex.isPresent()) {
            sb.append(locationFromRecordIndex.get());
        } else {
            sb.append("<empty>");
        }
        return sb.toString();
    }

    private List<String> getLatestBaseFileNames(HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<String> latestBaseFilenameList;
        if (!baseDataFilesForCleaning.isEmpty()) {
            List<HoodieBaseFile> sortedLatestBaseFileList = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
            latestBaseFilenameList = this.filterBaseFileBasedOnInflightCleaning(sortedLatestBaseFileList, baseDataFilesForCleaning).stream().map(BaseFile::getFileName).collect(Collectors.toList());
        } else {
            latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath).stream().map(BaseFile::getFileName).collect(Collectors.toList());
        }
        return latestBaseFilenameList;
    }

    private <T> void validate(List<T> infoListFromMetadataTable, List<T> infoListFromFS, String partitionPath, String label) {
        if (infoListFromMetadataTable.size() != infoListFromFS.size() || !infoListFromMetadataTable.equals(infoListFromFS)) {
            String message = String.format("Validation of %s for partition %s failed.\n%s from metadata: %s\n%s from file system and base files: %s", label, partitionPath, label, infoListFromMetadataTable, label, infoListFromFS);
            LOG.error(message);
            throw new HoodieValidationException(message);
        }
        LOG.info(String.format("Validation of %s succeeded for partition %s", label, partitionPath));
    }

    private void validateFileSlices(List<FileSlice> fileSliceListFromMetadataTable, List<FileSlice> fileSliceListFromFS, String partitionPath, HoodieTableMetaClient metaClient, String label) {
        boolean mismatch = false;
        if (fileSliceListFromMetadataTable.size() != fileSliceListFromFS.size()) {
            mismatch = true;
        } else if (!fileSliceListFromMetadataTable.equals(fileSliceListFromFS)) {
            HashMap<String, Set<String>> committedFilesMap = new HashMap<String, Set<String>>();
            for (int i = 0; i < fileSliceListFromMetadataTable.size(); ++i) {
                FileSlice fileSlice1 = fileSliceListFromMetadataTable.get(i);
                FileSlice fileSlice2 = fileSliceListFromFS.get(i);
                if (!(Objects.equals(fileSlice1.getFileGroupId(), fileSlice2.getFileGroupId()) && Objects.equals(fileSlice1.getBaseInstantTime(), fileSlice2.getBaseInstantTime()) && Objects.equals(fileSlice1.getBaseFile(), fileSlice2.getBaseFile()))) {
                    mismatch = true;
                    break;
                }
                if (!this.areFileSliceCommittedLogFilesMatching(fileSlice1, fileSlice2, metaClient, committedFilesMap)) {
                    mismatch = true;
                    break;
                }
                LOG.warn(String.format("There are uncommitted log files in the latest file slices but the committed log files match: %s %s", fileSlice1, fileSlice2));
            }
        }
        if (mismatch) {
            String message = String.format("Validation of %s for partition %s failed.\n%s from metadata: %s\n%s from file system and base files: %s", label, partitionPath, label, fileSliceListFromMetadataTable, label, fileSliceListFromFS);
            LOG.error(message);
            throw new HoodieValidationException(message);
        }
        LOG.info(String.format("Validation of %s succeeded for partition %s", label, partitionPath));
    }

    private boolean areFileSliceCommittedLogFilesMatching(FileSlice fs1, FileSlice fs2, HoodieTableMetaClient metaClient, Map<String, Set<String>> committedFilesMap) {
        Set<String> fs1LogPathSet = fs1.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet());
        Set<String> fs2LogPathSet = fs2.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet());
        HashSet commonLogPathSet = new HashSet(fs1LogPathSet);
        commonLogPathSet.retainAll(fs2LogPathSet);
        fs1LogPathSet.removeAll(commonLogPathSet);
        fs2LogPathSet.removeAll(commonLogPathSet);
        HoodieWrapperFileSystem fileSystem2 = metaClient.getFs();
        if (this.hasCommittedLogFiles(fileSystem2, fs1LogPathSet, metaClient, committedFilesMap)) {
            LOG.error("The first file slice has committed log files that cause mismatching: " + fs1 + "; Different log files are: " + fs1LogPathSet);
            return false;
        }
        if (this.hasCommittedLogFiles(fileSystem2, fs2LogPathSet, metaClient, committedFilesMap)) {
            LOG.error("The second file slice has committed log files that cause mismatching: " + fs2 + "; Different log files are: " + fs2LogPathSet);
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private boolean hasCommittedLogFiles(FileSystem fs, Set<String> logFilePathSet, HoodieTableMetaClient metaClient, Map<String, Set<String>> committedFilesMap) {
        if (logFilePathSet.isEmpty()) {
            return false;
        }
        String basePath = metaClient.getBasePathV2().toString();
        HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline();
        AvroSchemaConverter converter = new AvroSchemaConverter();
        HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
        HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
        Iterator<String> iterator2 = logFilePathSet.iterator();
        while (iterator2.hasNext()) {
            String logFilePathStr = iterator2.next();
            HoodieLogFormat.Reader reader = null;
            try {
                block19: {
                    block20: {
                        MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePathStr));
                        if (messageType == null) {
                            LOG.warn(String.format("Cannot read schema from log file %s. Skip the check as it's likely being written by an inflight instant.", logFilePathStr));
                            continue;
                        }
                        Schema readerSchema = converter.convert(messageType);
                        reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr), readerSchema);
                        if (!reader.hasNext()) break block20;
                        HoodieLogBlock block = (HoodieLogBlock)reader.next();
                        String instantTime = block.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.INSTANT_TIME);
                        if (completedInstantsTimeline.containsInstant(instantTime)) {
                            if (!committedFilesMap.containsKey(instantTime)) {
                                HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(completedInstantsTimeline.getInstantDetails(completedInstantsTimeline.filter(i -> i.getTimestamp().equals(instantTime)).firstInstant().get()).get(), HoodieCommitMetadata.class);
                                committedFilesMap.put(instantTime, commitMetadata.getWriteStats().stream().map(HoodieWriteStat::getPath).collect(Collectors.toSet()));
                            }
                            String relativeLogFilePathStr = this.getRelativePath(basePath, logFilePathStr);
                            if (committedFilesMap.get(instantTime).contains(relativeLogFilePathStr)) {
                                LOG.warn("Log file is committed in an instant in active timeline: instantTime=" + instantTime + " " + logFilePathStr);
                                boolean bl = true;
                                FileIOUtils.closeQuietly(reader);
                                return bl;
                            }
                            LOG.warn("Log file is uncommitted in a completed instant, likely due to retry: instantTime=" + instantTime + " " + logFilePathStr);
                            break block19;
                        } else {
                            if (completedInstantsTimeline.isBeforeTimelineStarts(instantTime)) {
                                LOG.warn("Log file is committed in an instant in archived timeline: instantTime=" + instantTime + " " + logFilePathStr);
                                boolean bl = true;
                                FileIOUtils.closeQuietly(reader);
                                return bl;
                            }
                            if (inflightInstantsTimeline.containsInstant(instantTime)) {
                                LOG.warn("Log file is uncommitted because of an inflight instant: instantTime=" + instantTime + " " + logFilePathStr);
                                break block19;
                            } else {
                                LOG.warn("Log file is uncommitted because the instant is after the start of the active timeline but absent or in requested in the active timeline: instantTime=" + instantTime + " " + logFilePathStr);
                            }
                        }
                        break block19;
                    }
                    LOG.warn("There is no log block in " + logFilePathStr);
                }
                FileIOUtils.closeQuietly(reader);
                continue;
            }
            catch (IOException e) {
                LOG.warn(String.format("Cannot read log file %s: %s. Skip the check as it's likely being written by an inflight instant.", logFilePathStr, e.getMessage()), (Throwable)e);
                continue;
            }
            finally {
                FileIOUtils.closeQuietly(reader);
                continue;
            }
            break;
        }
        return false;
    }

    private String getRelativePath(String basePath, String absoluteFilePath) {
        String basePathStr = CachingPath.getPathWithoutSchemeAndAuthority(new Path(basePath)).toString();
        String absoluteFilePathStr = CachingPath.getPathWithoutSchemeAndAuthority(new Path(absoluteFilePath)).toString();
        if (!absoluteFilePathStr.startsWith(basePathStr)) {
            throw new IllegalArgumentException("File path does not belong to the base path! basePath=" + basePathStr + " absoluteFilePathStr=" + absoluteFilePathStr);
        }
        String relativePathStr = absoluteFilePathStr.substring(basePathStr.length());
        return relativePathStr.startsWith("/") ? relativePathStr.substring(1) : relativePathStr;
    }

    private /* synthetic */ Pair lambda$doMetadataTableValidation$49b7b3f0$1(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, Set finalBaseFilesForCleaning, String partitionPath) throws Exception {
        try {
            this.validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath, finalBaseFilesForCleaning);
            LOG.info(String.format("Metadata table validation succeeded for partition %s (partition %s)", partitionPath, this.taskLabels));
            return Pair.of(true, "");
        }
        catch (HoodieValidationException e) {
            LOG.error(String.format("Metadata table validation failed for partition %s due to HoodieValidationException (partition %s)", partitionPath, this.taskLabels), (Throwable)e);
            if (!this.cfg.ignoreFailed) {
                throw e;
            }
            return Pair.of(false, e.getMessage() + " for partition: " + partitionPath);
        }
    }

    private static /* synthetic */ boolean lambda$doMetadataTableValidation$4(String path) {
        String fileExtension = FSUtils.getFileExtension(path);
        return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(fileExtension);
    }

    private /* synthetic */ Stream lambda$doMetadataTableValidation$3(HoodieInstant instant) {
        try {
            instant = new HoodieInstant(HoodieInstant.State.REQUESTED, instant.getAction(), instant.getTimestamp());
            HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(this.metaClient, instant);
            return cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().flatMap(cleanerFileInfoList -> cleanerFileInfoList.stream().map(fileInfo -> new Path(fileInfo.getFilePath()).getName()));
        }
        catch (IOException e) {
            throw new HoodieIOException("Error reading cleaner metadata for " + instant);
        }
    }

    private static class HoodieMetadataValidationContext
    implements AutoCloseable,
    Serializable {
        private static final Logger LOG = LoggerFactory.getLogger(HoodieMetadataValidationContext.class);
        private final HoodieTableMetaClient metaClient;
        private final HoodieTableFileSystemView fileSystemView;
        private final HoodieTableMetadata tableMetadata;
        private final boolean enableMetadataTable;
        private List<String> allColumnNameList;

        public HoodieMetadataValidationContext(HoodieEngineContext engineContext, Config cfg, HoodieTableMetaClient metaClient, boolean enableMetadataTable) {
            this.metaClient = metaClient;
            this.enableMetadataTable = enableMetadataTable;
            HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).withMetadataIndexBloomFilter(enableMetadataTable).withMetadataIndexColumnStats(enableMetadataTable).withEnableRecordIndex(enableMetadataTable).withAssumeDatePartitioning(cfg.assumeDatePartitioning).build();
            this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, metadataConfig);
            this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath());
            if (metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) {
                this.allColumnNameList = this.getAllColumnNames();
            }
        }

        public HoodieTableMetaClient getMetaClient() {
            return this.metaClient;
        }

        public HoodieTableMetadata getTableMetadata() {
            return this.tableMetadata;
        }

        public List<HoodieBaseFile> getSortedLatestBaseFileList(String partitionPath) {
            return this.fileSystemView.getLatestBaseFiles(partitionPath).sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());
        }

        public List<FileSlice> getSortedLatestFileSliceList(String partitionPath) {
            return this.fileSystemView.getLatestFileSlices(partitionPath).sorted(new FileSliceComparator()).collect(Collectors.toList());
        }

        public List<HoodieFileGroup> getSortedAllFileGroupList(String partitionPath) {
            return this.fileSystemView.getAllFileGroups(partitionPath).sorted(new HoodieFileGroupComparator()).collect(Collectors.toList());
        }

        public List<HoodieColumnRangeMetadata<Comparable>> getSortedColumnStatsList(String partitionPath, List<String> baseFileNameList) {
            LOG.info("All column names for getting column stats: " + this.allColumnNameList);
            if (this.enableMetadataTable) {
                List partitionFileNameList = baseFileNameList.stream().map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
                return this.allColumnNameList.stream().flatMap(columnName -> this.tableMetadata.getColumnStats(partitionFileNameList, (String)columnName).values().stream().map(HoodieTableMetadataUtil::convertColumnStatsRecordToColumnRangeMetadata).collect(Collectors.toList()).stream()).sorted(new HoodieColumnRangeMetadataComparator()).collect(Collectors.toList());
            }
            return baseFileNameList.stream().flatMap(filename -> new ParquetUtils().readRangeFromParquetMetadata(this.metaClient.getHadoopConf(), new Path(FSUtils.getPartitionPath(this.metaClient.getBasePath(), partitionPath), filename), this.allColumnNameList).stream()).sorted(new HoodieColumnRangeMetadataComparator()).collect(Collectors.toList());
        }

        public List<BloomFilterData> getSortedBloomFilterList(String partitionPath, List<String> baseFileNameList) {
            if (this.enableMetadataTable) {
                List<Pair<String, String>> partitionFileNameList = baseFileNameList.stream().map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
                return this.tableMetadata.getBloomFilters(partitionFileNameList).entrySet().stream().map(entry -> BloomFilterData.builder().setPartitionPath((String)((Pair)entry.getKey()).getKey()).setFilename((String)((Pair)entry.getKey()).getValue()).setBloomFilter(ByteBuffer.wrap(((BloomFilter)entry.getValue()).serializeToString().getBytes())).build()).sorted().collect(Collectors.toList());
            }
            return baseFileNameList.stream().map(filename -> this.readBloomFilterFromFile(partitionPath, (String)filename)).filter(Option::isPresent).map(Option::get).sorted().collect(Collectors.toList());
        }

        private List<String> getAllColumnNames() {
            TableSchemaResolver schemaResolver = new TableSchemaResolver(this.metaClient);
            try {
                return schemaResolver.getTableAvroSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
            }
            catch (Exception e) {
                throw new HoodieException("Failed to get all column names for " + this.metaClient.getBasePath());
            }
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private Option<BloomFilterData> readBloomFilterFromFile(String partitionPath, String filename) {
            Path path = new Path(FSUtils.getPartitionPath(this.metaClient.getBasePathV2(), partitionPath), filename);
            try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(this.metaClient.getHadoopConf(), path);){
                BloomFilter bloomFilter = fileReader.readBloomFilter();
                if (bloomFilter != null) return Option.of(BloomFilterData.builder().setPartitionPath(partitionPath).setFilename(filename).setBloomFilter(ByteBuffer.wrap(bloomFilter.serializeToString().getBytes())).build());
                LOG.error("Failed to read bloom filter for " + path);
                Option<BloomFilterData> option = Option.empty();
                return option;
            }
            catch (IOException e) {
                LOG.error("Failed to get file reader for " + path + " " + e.getMessage());
                return Option.empty();
            }
        }

        @Override
        public void close() throws Exception {
            this.tableMetadata.close();
            this.fileSystemView.close();
        }
    }

    public static class HoodieColumnRangeMetadataComparator
    implements Comparator<HoodieColumnRangeMetadata<Comparable>>,
    Serializable {
        @Override
        public int compare(HoodieColumnRangeMetadata<Comparable> o1, HoodieColumnRangeMetadata<Comparable> o2) {
            return o1.toString().compareTo(o2.toString());
        }
    }

    public static class HoodieFileGroupComparator
    implements Comparator<HoodieFileGroup>,
    Serializable {
        @Override
        public int compare(HoodieFileGroup o1, HoodieFileGroup o2) {
            return o1.getFileGroupId().compareTo(o2.getFileGroupId());
        }
    }

    public static class HoodieBaseFileComparator
    implements Comparator<HoodieBaseFile>,
    Serializable {
        @Override
        public int compare(HoodieBaseFile o1, HoodieBaseFile o2) {
            return o1.getPath().compareTo(o2.getPath());
        }
    }

    public static class FileSliceComparator
    implements Comparator<FileSlice>,
    Serializable {
        @Override
        public int compare(FileSlice o1, FileSlice o2) {
            return (o1.getPartitionPath() + o1.getFileId() + o1.getBaseInstantTime()).compareTo(o2.getPartitionPath() + o2.getFileId() + o2.getBaseInstantTime());
        }
    }

    public class AsyncMetadataTableValidateService
    extends HoodieAsyncService {
        private final transient ExecutorService executor = Executors.newSingleThreadExecutor();

        @Override
        protected Pair<CompletableFuture, ExecutorService> startService() {
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                while (true) {
                    try {
                        while (true) {
                            long start2 = System.currentTimeMillis();
                            HoodieMetadataTableValidator.this.doMetadataTableValidation();
                            long toSleepMs = (long)(((HoodieMetadataTableValidator)HoodieMetadataTableValidator.this).cfg.minValidateIntervalSeconds * 1000) - (System.currentTimeMillis() - start2);
                            if (toSleepMs <= 0L) continue;
                            LOG.info("Last validate ran less than min validate interval: " + ((HoodieMetadataTableValidator)HoodieMetadataTableValidator.this).cfg.minValidateIntervalSeconds + " s, sleep: " + toSleepMs + " ms.");
                            Thread.sleep(toSleepMs);
                        }
                    }
                    catch (HoodieValidationException e) {
                        LOG.error("Shutting down AsyncMetadataTableValidateService due to HoodieValidationException", (Throwable)e);
                        if (((HoodieMetadataTableValidator)HoodieMetadataTableValidator.this).cfg.ignoreFailed) continue;
                        throw e;
                    }
                    catch (InterruptedException interruptedException) {
                        continue;
                    }
                    break;
                }
            }, this.executor), this.executor);
        }
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-sp"}, description="Base path for the table", required=true)
        public String basePath = null;
        @Parameter(names={"--continuous"}, description="Running MetadataTableValidator in continuous. Can use --min-validate-interval-seconds to control validation frequency", required=false)
        public boolean continuous = false;
        @Parameter(names={"--skip-data-files-for-cleaning"}, description="Skip to compare the data files which are under deletion by cleaner", required=false)
        public boolean skipDataFilesForCleaning = false;
        @Parameter(names={"--validate-latest-file-slices"}, description="Validate latest file slices for all partitions.", required=false)
        public boolean validateLatestFileSlices = false;
        @Parameter(names={"--validate-latest-base-files"}, description="Validate latest base files for all partitions.", required=false)
        public boolean validateLatestBaseFiles = false;
        @Parameter(names={"--validate-all-file-groups"}, description="Validate all file groups, and all file slices within file groups.", required=false)
        public boolean validateAllFileGroups = false;
        @Parameter(names={"--validate-all-column-stats"}, description="Validate column stats for all columns in the schema", required=false)
        public boolean validateAllColumnStats = false;
        @Parameter(names={"--validate-bloom-filters"}, description="Validate bloom filters of base files", required=false)
        public boolean validateBloomFilters = false;
        @Parameter(names={"--validate-record-index-count"}, description="Validate the number of entries in the record index, which should be equal to the number of record keys in the latest snapshot of the table", required=false)
        public boolean validateRecordIndexCount = false;
        @Parameter(names={"--validate-record-index-content"}, description="Validate the content of the record index so that each record key should have the correct location, and there is no additional or missing entry", required=false)
        public boolean validateRecordIndexContent = false;
        @Parameter(names={"--num-record-index-error-samples"}, description="Number of error samples to show for record index validation", required=false)
        public int numRecordIndexErrorSamples = 100;
        @Parameter(names={"--min-validate-interval-seconds"}, description="the min validate interval of each validate when set --continuous, default is 10 minutes.")
        public Integer minValidateIntervalSeconds = 600;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for valuation", required=false)
        public int parallelism = 200;
        @Parameter(names={"--record-index-parallelism", "-rpl"}, description="Parallelism for validating record index", required=false)
        public int recordIndexParallelism = 100;
        @Parameter(names={"--ignore-failed", "-ig"}, description="Ignore metadata validate failure and continue.", required=false)
        public boolean ignoreFailed = false;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master", required=false)
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=false)
        public String sparkMemory = "1g";
        @Parameter(names={"--assume-date-partitioning"}, description="Should HoodieWriteClient assume the data is partitioned by dates, i.e three levels from base path.This is a stop-gap to support tables created by versions < 0.3.1. Will be removed eventually", required=false)
        public Boolean assumeDatePartitioning = false;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;

        public String toString() {
            return "MetadataTableValidatorConfig {\n   --base-path " + this.basePath + ", \n   --validate-latest-file-slices " + this.validateLatestFileSlices + ", \n   --validate-latest-base-files " + this.validateLatestBaseFiles + ", \n   --validate-all-file-groups " + this.validateAllFileGroups + ", \n   --validate-all-column-stats " + this.validateAllColumnStats + ", \n   --validate-bloom-filters " + this.validateBloomFilters + ", \n   --validate-record-index-count " + this.validateRecordIndexCount + ", \n   --validate-record-index-content " + this.validateRecordIndexContent + ", \n   --num-record-index-error-samples " + this.numRecordIndexErrorSamples + ", \n   --continuous " + this.continuous + ", \n   --skip-data-files-for-cleaning " + this.skipDataFilesForCleaning + ", \n   --ignore-failed " + this.ignoreFailed + ", \n   --min-validate-interval-seconds " + this.minValidateIntervalSeconds + ", \n   --parallelism " + this.parallelism + ", \n   --record-index-parallelism " + this.recordIndexParallelism + ", \n   --spark-master " + this.sparkMaster + ", \n   --spark-memory " + this.sparkMemory + ", \n   --assumeDatePartitioning-memory " + this.assumeDatePartitioning + ", \n   --props " + this.propsFilePath + ", \n   --hoodie-conf " + this.configs + "\n}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Config config = (Config)o;
            return this.basePath.equals(config.basePath) && Objects.equals(this.continuous, config.continuous) && Objects.equals(this.skipDataFilesForCleaning, config.skipDataFilesForCleaning) && Objects.equals(this.validateLatestFileSlices, config.validateLatestFileSlices) && Objects.equals(this.validateLatestBaseFiles, config.validateLatestBaseFiles) && Objects.equals(this.validateAllFileGroups, config.validateAllFileGroups) && Objects.equals(this.validateAllColumnStats, config.validateAllColumnStats) && Objects.equals(this.validateBloomFilters, config.validateBloomFilters) && Objects.equals(this.validateRecordIndexCount, config.validateRecordIndexCount) && Objects.equals(this.validateRecordIndexContent, config.validateRecordIndexContent) && Objects.equals(this.numRecordIndexErrorSamples, config.numRecordIndexErrorSamples) && Objects.equals(this.minValidateIntervalSeconds, config.minValidateIntervalSeconds) && Objects.equals(this.parallelism, config.parallelism) && Objects.equals(this.recordIndexParallelism, config.recordIndexParallelism) && Objects.equals(this.ignoreFailed, config.ignoreFailed) && Objects.equals(this.sparkMaster, config.sparkMaster) && Objects.equals(this.sparkMemory, config.sparkMemory) && Objects.equals(this.assumeDatePartitioning, config.assumeDatePartitioning) && Objects.equals(this.propsFilePath, config.propsFilePath) && Objects.equals(this.configs, config.configs);
        }

        public int hashCode() {
            return Objects.hash(this.basePath, this.continuous, this.skipDataFilesForCleaning, this.validateLatestFileSlices, this.validateLatestBaseFiles, this.validateAllFileGroups, this.validateAllColumnStats, this.validateBloomFilters, this.validateRecordIndexCount, this.validateRecordIndexContent, this.numRecordIndexErrorSamples, this.minValidateIntervalSeconds, this.parallelism, this.recordIndexParallelism, this.ignoreFailed, this.sparkMaster, this.sparkMemory, this.assumeDatePartitioning, this.propsFilePath, this.configs, this.help);
        }
    }
}

