/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.avro.AvroTypeException;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.ConvertingGenericData;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.BooleanWrapper;
import org.apache.hudi.avro.model.DateWrapper;
import org.apache.hudi.avro.model.DoubleWrapper;
import org.apache.hudi.avro.model.FloatWrapper;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.IntWrapper;
import org.apache.hudi.avro.model.LongWrapper;
import org.apache.hudi.avro.model.StringWrapper;
import org.apache.hudi.avro.model.TimeMicrosWrapper;
import org.apache.hudi.avro.model.TimestampMicrosWrapper;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieIndexMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineFactory;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Tuple3;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metadata.BaseFileRecordParsingUtils;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.util.Lazy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieTableMetadataUtil {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieTableMetadataUtil.class);
    public static final String PARTITION_NAME_FILES = "files";
    public static final String PARTITION_NAME_PARTITION_STATS = "partition_stats";
    public static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
    public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
    public static final String PARTITION_NAME_RECORD_INDEX = "record_index";
    public static final String PARTITION_NAME_EXPRESSION_INDEX = "expr_index";
    public static final String PARTITION_NAME_EXPRESSION_INDEX_PREFIX = "expr_index_";
    public static final String PARTITION_NAME_SECONDARY_INDEX = "secondary_index";
    public static final String PARTITION_NAME_SECONDARY_INDEX_PREFIX = "secondary_index_";
    private static final Set<Schema.Type> SUPPORTED_TYPES_PARTITION_STATS = new HashSet<Schema.Type>(Arrays.asList(Schema.Type.INT, Schema.Type.LONG, Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.STRING, Schema.Type.BOOLEAN, Schema.Type.NULL, Schema.Type.BYTES));
    public static final Set<String> SUPPORTED_META_FIELDS_PARTITION_STATS = new HashSet<String>(Arrays.asList(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName(), HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.getFieldName(), HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName()));
    private static final int DECIMAL_MAX_PRECISION = 30;
    private static final int DECIMAL_MAX_SCALE = 15;
    public static final Set<Class<?>> COLUMN_STATS_RECORD_SUPPORTED_TYPES = new HashSet<Class>(Arrays.asList(IntWrapper.class, BooleanWrapper.class, DateWrapper.class, DoubleWrapper.class, FloatWrapper.class, LongWrapper.class, StringWrapper.class, TimeMicrosWrapper.class, TimestampMicrosWrapper.class));
    @VisibleForTesting
    static final String[] META_COLS_TO_ALWAYS_INDEX = new String[]{HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD};
    @VisibleForTesting
    public static final Set<String> META_COL_SET_TO_INDEX = new HashSet<String>(Arrays.asList(META_COLS_TO_ALWAYS_INDEX));
    @VisibleForTesting
    static final Map<String, Schema> META_COLS_TO_ALWAYS_INDEX_SCHEMA_MAP = new TreeMap(){
        {
            this.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, Schema.create((Schema.Type)Schema.Type.STRING));
            this.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, Schema.create((Schema.Type)Schema.Type.STRING));
            this.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, Schema.create((Schema.Type)Schema.Type.STRING));
        }
    };

    private HoodieTableMetadataUtil() {
    }

    public static boolean isFilesPartitionAvailable(HoodieTableMetaClient metaClient) {
        return metaClient.getTableConfig().getMetadataPartitions().contains(PARTITION_NAME_FILES);
    }

    public static Map<String, HoodieColumnRangeMetadata<Comparable>> collectColumnRangeMetadata(List<HoodieRecord> records, List<Pair<String, Schema.Field>> targetFields, String filePath, Schema recordSchema) {
        class ColumnStats {
            Object minValue;
            Object maxValue;
            long nullCount;
            long valueCount;

            ColumnStats() {
            }
        }
        HashMap allColumnStats = new HashMap();
        records.forEach(record -> targetFields.forEach(fieldNameFieldPair -> {
            Object fieldValue;
            String fieldName = (String)fieldNameFieldPair.getKey();
            Schema fieldSchema = AvroSchemaUtils.resolveNullableSchema(((Schema.Field)fieldNameFieldPair.getValue()).schema());
            ColumnStats colStats = allColumnStats.computeIfAbsent(fieldName, ignored -> new ColumnStats());
            if (record.getRecordType() == HoodieRecord.HoodieRecordType.AVRO) {
                fieldValue = HoodieAvroUtils.getRecordColumnValues(record, new String[]{fieldName}, recordSchema, false)[0];
                if (fieldSchema.getType() == Schema.Type.INT && fieldSchema.getLogicalType() != null && fieldSchema.getLogicalType() == LogicalTypes.date()) {
                    fieldValue = java.sql.Date.valueOf(fieldValue.toString());
                }
            } else if (record.getRecordType() == HoodieRecord.HoodieRecordType.SPARK) {
                fieldValue = record.getColumnValues(recordSchema, new String[]{fieldName}, false)[0];
                if (fieldSchema.getType() == Schema.Type.INT && fieldSchema.getLogicalType() != null && fieldSchema.getLogicalType() == LogicalTypes.date()) {
                    fieldValue = java.sql.Date.valueOf(LocalDate.ofEpochDay(((Integer)fieldValue).intValue()).toString());
                }
            } else {
                throw new HoodieException(String.format("Unknown record type: %s", new Object[]{record.getRecordType()}));
            }
            ++colStats.valueCount;
            if (fieldValue != null && HoodieTableMetadataUtil.isColumnTypeSupported(fieldSchema, Option.of(record.getRecordType()))) {
                if (colStats.minValue == null || ConvertingGenericData.INSTANCE.compare(fieldValue, colStats.minValue, fieldSchema) < 0) {
                    colStats.minValue = fieldValue;
                }
                if (colStats.maxValue == null || ConvertingGenericData.INSTANCE.compare(fieldValue, colStats.maxValue, fieldSchema) > 0) {
                    colStats.maxValue = fieldValue;
                }
            } else {
                ++colStats.nullCount;
            }
        }));
        Stream<HoodieColumnRangeMetadata> hoodieColumnRangeMetadataStream = targetFields.stream().map(fieldNameFieldPair -> {
            String fieldName = (String)fieldNameFieldPair.getKey();
            Schema fieldSchema = ((Schema.Field)fieldNameFieldPair.getValue()).schema();
            ColumnStats colStats = (ColumnStats)allColumnStats.get(fieldName);
            HoodieColumnRangeMetadata<Comparable<?>> hcrm = HoodieColumnRangeMetadata.create(filePath, fieldName, colStats == null ? null : HoodieTableMetadataUtil.coerceToComparable(fieldSchema, colStats.minValue), colStats == null ? null : HoodieTableMetadataUtil.coerceToComparable(fieldSchema, colStats.maxValue), colStats == null ? 0L : colStats.nullCount, colStats == null ? 0L : colStats.valueCount, 0L, 0L);
            return hcrm;
        });
        return hoodieColumnRangeMetadataStream.collect(Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, Function.identity()));
    }

    public static Option<String> getColumnStatsValueAsString(Object statsValue) {
        if (statsValue == null) {
            LOG.info("Invalid column stats value: {}", statsValue);
            return Option.empty();
        }
        Class<?> statsValueClass = statsValue.getClass();
        if (COLUMN_STATS_RECORD_SUPPORTED_TYPES.contains(statsValueClass)) {
            return Option.of(String.valueOf(((IndexedRecord)statsValue).get(0)));
        }
        throw new HoodieNotSupportedException("Unsupported type: " + statsValueClass.getSimpleName());
    }

    public static void deleteMetadataTable(String basePath, HoodieEngineContext context) {
        HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(context.getStorageConf().newInstance()).build();
        HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, context, false);
    }

    public static void deleteMetadataPartition(StoragePath basePath, HoodieEngineContext context, String partitionPath) {
        HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(context.getStorageConf().newInstance()).build();
        HoodieTableMetadataUtil.deleteMetadataTablePartition(dataMetaClient, context, partitionPath, false);
    }

    public static boolean metadataPartitionExists(String basePath, HoodieEngineContext context, String partitionPath) {
        String metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
        HoodieStorage storage2 = HoodieStorageUtils.getStorage(metadataTablePath, context.getStorageConf());
        try {
            return storage2.exists(new StoragePath(metadataTablePath, partitionPath));
        }
        catch (Exception e) {
            throw new HoodieIOException(String.format("Failed to check metadata partition %s exists.", partitionPath));
        }
    }

    public static boolean metadataPartitionExists(StoragePath basePath, HoodieEngineContext context, String partitionPath) {
        return HoodieTableMetadataUtil.metadataPartitionExists(basePath.toString(), context, partitionPath);
    }

    public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext context, HoodieConfig hoodieConfig, HoodieCommitMetadata commitMetadata, String instantTime, HoodieTableMetaClient dataMetaClient, HoodieTableMetadata tableMetadata, HoodieMetadataConfig metadataConfig, Set<String> enabledPartitionTypes, String bloomFilterType, int bloomIndexParallelism, int writesFileIdEncoding, EngineType engineType, Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
        HashMap<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<String, HoodieData<HoodieRecord>>();
        HoodieData<HoodieRecord> filesPartitionRecordsRDD = context.parallelize(HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 1);
        partitionToRecordsMap.put(MetadataPartitionType.FILES.getPartitionPath(), filesPartitionRecordsRDD);
        if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath())) {
            HoodieData<HoodieRecord> metadataBloomFilterRecords = HoodieTableMetadataUtil.convertMetadataToBloomFilterRecords(context, hoodieConfig, commitMetadata, instantTime, dataMetaClient, bloomFilterType, bloomIndexParallelism);
            partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath(), metadataBloomFilterRecords);
        }
        if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) {
            HoodieData<HoodieRecord> metadataColumnStatsRDD = HoodieTableMetadataUtil.convertMetadataToColumnStatsRecords(commitMetadata, context, dataMetaClient, metadataConfig, recordTypeOpt);
            partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), metadataColumnStatsRDD);
        }
        if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath())) {
            ValidationUtils.checkState(MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient), "Column stats partition must be enabled to generate partition stats. Please enable: " + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
            boolean isDeletePartition = commitMetadata.getOperationType().equals((Object)WriteOperationType.DELETE_PARTITION);
            HoodieData<HoodieRecord> partitionStatsRDD = HoodieTableMetadataUtil.convertMetadataToPartitionStatRecords(commitMetadata, context, dataMetaClient, tableMetadata, metadataConfig, recordTypeOpt, isDeletePartition);
            partitionToRecordsMap.put(MetadataPartitionType.PARTITION_STATS.getPartitionPath(), partitionStatsRDD);
        }
        if (enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath())) {
            partitionToRecordsMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords(context, commitMetadata, metadataConfig, dataMetaClient, writesFileIdEncoding, instantTime, engineType));
        }
        return partitionToRecordsMap;
    }

    public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCommitMetadata commitMetadata, String instantTime) {
        ArrayList<HoodieRecord> records = new ArrayList<HoodieRecord>(commitMetadata.getPartitionToWriteStats().size());
        List<String> partitionsAdded = HoodieTableMetadataUtil.getPartitionsAdded(commitMetadata);
        records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded));
        HoodieAtomicLongAccumulator newFileCount = HoodieAtomicLongAccumulator.create();
        List updatedPartitionFilesRecords = commitMetadata.getPartitionToWriteStats().entrySet().stream().map(entry -> {
            String partitionStatName = (String)entry.getKey();
            List writeStats = (List)entry.getValue();
            HashMap updatedFilesToSizesMapping = writeStats.stream().reduce(new HashMap(writeStats.size()), (map2, stat) -> {
                String pathWithPartition = stat.getPath();
                if (pathWithPartition == null) {
                    LOG.warn("Unable to find path in write stat to update metadata table {}", stat);
                    return map2;
                }
                String fileName = FSUtils.getFileName(pathWithPartition, partitionStatName);
                map2.merge(fileName, stat.getFileSizeInBytes(), Math::max);
                Map<String, Long> cdcPathAndSizes = stat.getCdcStats();
                if (cdcPathAndSizes != null && !cdcPathAndSizes.isEmpty()) {
                    cdcPathAndSizes.forEach((key, value) -> map2.put(FSUtils.getFileName(key, partitionStatName), value));
                }
                return map2;
            }, CollectionUtils::combine);
            newFileCount.add(updatedFilesToSizesMapping.size());
            return HoodieMetadataPayload.createPartitionFilesRecord(partitionStatName, updatedFilesToSizesMapping, Collections.emptyList());
        }).collect(Collectors.toList());
        records.addAll(updatedPartitionFilesRecords);
        LOG.info("Updating at {} from Commit/{}. #partitions_updated={}, #files_added={}", new Object[]{instantTime, commitMetadata.getOperationType(), records.size(), ((HoodieAccumulator)newFileCount).value()});
        return records;
    }

    private static List<String> getPartitionsAdded(HoodieCommitMetadata commitMetadata) {
        return commitMetadata.getPartitionToWriteStats().keySet().stream().map(HoodieTableMetadataUtil::getPartitionIdentifierForFilesPartition).collect(Collectors.toList());
    }

    public static Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> metadataList) {
        return metadataList.stream().map(HoodieCommitMetadata::getWritePartitionPaths).flatMap(Collection::stream).collect(Collectors.toSet());
    }

    public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieEngineContext context, HoodieConfig hoodieConfig, HoodieCommitMetadata commitMetadata, String instantTime, HoodieTableMetaClient dataMetaClient, String bloomFilterType, int bloomIndexParallelism) {
        List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
        if (allWriteStats.isEmpty()) {
            return context.emptyHoodieData();
        }
        int parallelism = Math.max(Math.min(allWriteStats.size(), bloomIndexParallelism), 1);
        HoodieData<HoodieWriteStat> allWriteStatsRDD = context.parallelize(allWriteStats, parallelism);
        return allWriteStatsRDD.flatMap(hoodieWriteStat -> {
            String partition = hoodieWriteStat.getPartitionPath();
            if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
                return Collections.emptyListIterator();
            }
            String pathWithPartition = hoodieWriteStat.getPath();
            if (pathWithPartition == null) {
                LOG.error("Failed to find path in write stat to update metadata table {}", hoodieWriteStat);
                return Collections.emptyListIterator();
            }
            String fileName = FSUtils.getFileName(pathWithPartition, partition);
            if (!FSUtils.isBaseFile(new StoragePath(fileName))) {
                return Collections.emptyListIterator();
            }
            StoragePath writeFilePath = new StoragePath(dataMetaClient.getBasePath(), pathWithPartition);
            try {
                Throwable throwable = null;
                try (HoodieFileReader fileReader = HoodieIOFactory.getIOFactory(dataMetaClient.getStorage()).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hoodieConfig, writeFilePath);){
                    BloomFilter fileBloomFilter = fileReader.readBloomFilter();
                    if (fileBloomFilter == null) {
                        LOG.error("Failed to read bloom filter for {}", (Object)writeFilePath);
                        ListIterator listIterator = Collections.emptyListIterator();
                        return listIterator;
                    }
                    ByteBuffer bloomByteBuffer = ByteBuffer.wrap(StringUtils.getUTF8Bytes(fileBloomFilter.serializeToString()));
                    HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createBloomFilterMetadataRecord(partition, fileName, instantTime, bloomFilterType, bloomByteBuffer, false);
                    Iterator<HoodieRecord<HoodieMetadataPayload>> iterator2 = Collections.singletonList(record).iterator();
                    return iterator2;
                }
                catch (Exception e) {
                    LOG.error("Failed to read bloom filter for {}", (Object)writeFilePath);
                    ListIterator listIterator = Collections.emptyListIterator();
                    return listIterator;
                }
                catch (Throwable throwable5) {
                    throwable = throwable5;
                    throw throwable5;
                }
            }
            catch (IOException e2) {
                LOG.error("Failed to get bloom filter for file: {}, write stat: {}", (Object)writeFilePath, hoodieWriteStat);
                return Collections.emptyListIterator();
            }
        });
    }

    public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext engineContext, HoodieCleanMetadata cleanMetadata, String instantTime, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig metadataConfig, List<MetadataPartitionType> enabledPartitionTypes, int bloomIndexParallelism, Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
        HashMap<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<String, HoodieData<HoodieRecord>>();
        HoodieData<HoodieRecord> filesPartitionRecordsRDD = engineContext.parallelize(HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1);
        partitionToRecordsMap.put(MetadataPartitionType.FILES.getPartitionPath(), filesPartitionRecordsRDD);
        if (enabledPartitionTypes.contains((Object)MetadataPartitionType.BLOOM_FILTERS)) {
            HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD = HoodieTableMetadataUtil.convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, bloomIndexParallelism);
            partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath(), metadataBloomFilterRecordsRDD);
        }
        if (enabledPartitionTypes.contains((Object)MetadataPartitionType.COLUMN_STATS)) {
            HoodieData<HoodieRecord> metadataColumnStatsRDD = HoodieTableMetadataUtil.convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, dataMetaClient, metadataConfig, recordTypeOpt);
            partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), metadataColumnStatsRDD);
        }
        if (enabledPartitionTypes.contains((Object)MetadataPartitionType.EXPRESSION_INDEX)) {
            HoodieTableMetadataUtil.convertMetadataToExpressionIndexRecords(engineContext, cleanMetadata, instantTime, dataMetaClient, metadataConfig, bloomIndexParallelism, partitionToRecordsMap, recordTypeOpt);
        }
        return partitionToRecordsMap;
    }

    private static void convertMetadataToExpressionIndexRecords(HoodieEngineContext engineContext, HoodieCleanMetadata cleanMetadata, String instantTime, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig metadataConfig, int bloomIndexParallelism, Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap, Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
        Option<HoodieIndexMetadata> indexMetadata = dataMetaClient.getIndexMetadata();
        if (indexMetadata.isPresent()) {
            HoodieIndexMetadata metadata2 = indexMetadata.get();
            Map<String, HoodieIndexDefinition> indexDefinitions = metadata2.getIndexDefinitions();
            if (indexDefinitions.isEmpty()) {
                throw new HoodieMetadataException("Expression index metadata not found");
            }
            for (Map.Entry<String, HoodieIndexDefinition> entry : indexDefinitions.entrySet()) {
                String indexName = entry.getKey();
                HoodieIndexDefinition indexDefinition = entry.getValue();
                if (!MetadataPartitionType.EXPRESSION_INDEX.equals((Object)MetadataPartitionType.fromPartitionPath(indexDefinition.getIndexName()))) continue;
                if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) {
                    partitionToRecordsMap.put(indexName, HoodieTableMetadataUtil.convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, bloomIndexParallelism));
                    continue;
                }
                if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
                    HoodieMetadataConfig modifiedMetadataConfig = HoodieMetadataConfig.newBuilder().withProperties(metadataConfig.getProps()).withColumnStatsIndexForColumns(String.join((CharSequence)",", indexDefinition.getSourceFields())).build();
                    partitionToRecordsMap.put(indexName, HoodieTableMetadataUtil.convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, dataMetaClient, modifiedMetadataConfig, recordTypeOpt));
                    continue;
                }
                throw new HoodieMetadataException("Unsupported expression index type");
            }
        } else {
            throw new HoodieMetadataException("Expression index metadata not found");
        }
    }

    public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCleanMetadata cleanMetadata, String instantTime) {
        LinkedList<HoodieRecord> records = new LinkedList<HoodieRecord>();
        int[] fileDeleteCount = new int[]{0};
        ArrayList<String> deletedPartitions = new ArrayList<String>();
        cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> {
            List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
            HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createPartitionFilesRecord(partitionName, Collections.emptyMap(), deletedFiles);
            records.add(record);
            fileDeleteCount[0] = fileDeleteCount[0] + deletedFiles.size();
            boolean isPartitionDeleted = partitionMetadata.getIsPartitionDeleted();
            if (isPartitionDeleted) {
                deletedPartitions.add((String)partitionName);
            }
        });
        if (!deletedPartitions.isEmpty()) {
            records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true));
        }
        LOG.info("Updating at {} from Clean. #partitions_updated={}, #files_deleted={}, #partitions_deleted={}", new Object[]{instantTime, records.size(), fileDeleteCount[0], deletedPartitions.size()});
        return records;
    }

    public static Map<String, HoodieData<HoodieRecord>> convertMissingPartitionRecords(HoodieEngineContext engineContext, List<String> deletedPartitions, Map<String, Map<String, Long>> filesAdded, Map<String, List<String>> filesDeleted, String instantTime) {
        LinkedList<HoodieRecord<HoodieMetadataPayload>> records = new LinkedList<HoodieRecord<HoodieMetadataPayload>>();
        int[] fileDeleteCount = new int[]{0};
        int[] filesAddedCount = new int[]{0};
        filesAdded.forEach((partition, filesToAdd) -> {
            filesAddedCount[0] = filesAddedCount[0] + filesToAdd.size();
            List<String> filesToDelete = filesDeleted.getOrDefault(partition, Collections.emptyList());
            fileDeleteCount[0] = fileDeleteCount[0] + filesToDelete.size();
            HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesToAdd, filesToDelete);
            records.add(record);
        });
        filesDeleted.forEach((partition, filesToDelete) -> {
            if (!filesAdded.containsKey(partition)) {
                fileDeleteCount[0] = fileDeleteCount[0] + filesToDelete.size();
                HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Collections.emptyMap(), filesToDelete);
                records.add(record);
            }
        });
        if (!deletedPartitions.isEmpty()) {
            records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true));
        }
        LOG.info("Re-adding missing records at {} during Restore. #partitions_updated={}, #files_added={}, #files_deleted={}, #partitions_deleted={}", new Object[]{instantTime, records.size(), filesAddedCount[0], fileDeleteCount[0], deletedPartitions.size()});
        return Collections.singletonMap(MetadataPartitionType.FILES.getPartitionPath(), engineContext.parallelize(records, 1));
    }

    public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata, HoodieEngineContext engineContext, String instantTime, int bloomIndexParallelism) {
        ArrayList deleteFileList = new ArrayList();
        cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
            List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
            deletedFiles.forEach(entry -> {
                StoragePath deletedFilePath = new StoragePath((String)entry);
                if (FSUtils.isBaseFile(deletedFilePath)) {
                    deleteFileList.add(Pair.of(partition, deletedFilePath.getName()));
                }
            });
        });
        int parallelism = Math.max(Math.min(deleteFileList.size(), bloomIndexParallelism), 1);
        HoodieData deleteFileListRDD = engineContext.parallelize(deleteFileList, parallelism);
        return deleteFileListRDD.map(deleteFileInfoPair -> HoodieMetadataPayload.createBloomFilterMetadataRecord((String)deleteFileInfoPair.getLeft(), (String)deleteFileInfoPair.getRight(), instantTime, "", ByteBuffer.allocate(0), true));
    }

    public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata, HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig metadataConfig, Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
        ArrayList deleteFileList = new ArrayList();
        cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
            List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
            deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, entry)));
        });
        if (deleteFileList.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        ArrayList<String> columnsToIndex = new ArrayList<String>(HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig, Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient)), false, recordTypeOpt).keySet());
        if (columnsToIndex.isEmpty()) {
            LOG.warn("No columns to index for column stats index.");
            return engineContext.emptyHoodieData();
        }
        int parallelism = Math.max(Math.min(deleteFileList.size(), metadataConfig.getColumnStatsIndexParallelism()), 1);
        return engineContext.parallelize(deleteFileList, parallelism).flatMap(deleteFileInfoPair -> {
            String partitionPath = (String)deleteFileInfoPair.getLeft();
            String fileName = (String)deleteFileInfoPair.getRight();
            return HoodieTableMetadataUtil.getColumnStatsRecords(partitionPath, fileName, dataMetaClient, columnsToIndex, true).iterator();
        });
    }

    @VisibleForTesting
    public static HoodieData<HoodieRecord> convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext, HoodieCommitMetadata commitMetadata, HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataTableMetaClient, int writesFileIdEncoding, String instantTime, EngineType engineType) {
        List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
        if (allWriteStats.isEmpty() || commitMetadata.getOperationType() == WriteOperationType.COMPACT) {
            return engineContext.emptyHoodieData();
        }
        if (allWriteStats.stream().anyMatch(writeStat -> {
            String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath());
            return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0L;
        })) {
            throw new HoodieIOException("RLI cannot support logs having inserts with current offering. Would recommend disabling Record Level Index");
        }
        try {
            Option<Schema> writerSchemaOpt;
            Map<String, List<HoodieWriteStat>> writeStatsByFileId = allWriteStats.stream().collect(Collectors.groupingBy(HoodieWriteStat::getFileId));
            int parallelism = Math.max(Math.min(writeStatsByFileId.size(), metadataConfig.getRecordIndexMaxParallelism()), 1);
            String basePath = dataTableMetaClient.getBasePath().toString();
            HoodieFileFormat baseFileFormat = dataTableMetaClient.getTableConfig().getBaseFileFormat();
            StorageConfiguration<?> storageConfiguration = dataTableMetaClient.getStorageConf();
            Option<Schema> finalWriterSchemaOpt = writerSchemaOpt = HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient);
            HoodieData<HoodieRecord> recordIndexRecords = engineContext.parallelize(new ArrayList<Map.Entry<String, List<HoodieWriteStat>>>(writeStatsByFileId.entrySet()), parallelism).flatMap(writeStatsByFileIdEntry -> {
                String fileId = (String)writeStatsByFileIdEntry.getKey();
                List writeStats = (List)writeStatsByFileIdEntry.getValue();
                List baseFileWriteStats = writeStats.stream().filter(writeStat -> writeStat.getPath().endsWith(baseFileFormat.getFileExtension())).collect(Collectors.toList());
                List logFileWriteStats = writeStats.stream().filter(writeStat -> FSUtils.isLogFile(new StoragePath(((HoodieWriteStat)writeStats.get(0)).getPath()))).collect(Collectors.toList());
                ValidationUtils.checkState(baseFileWriteStats.isEmpty() || logFileWriteStats.isEmpty(), "A single fileId cannot have both base file and log file write stats in the same commit. FileId: " + fileId);
                if (!baseFileWriteStats.isEmpty()) {
                    return baseFileWriteStats.stream().flatMap(writeStat -> {
                        HoodieStorage storage2 = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration);
                        return CollectionUtils.toStream(BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage2));
                    }).iterator();
                }
                if (!logFileWriteStats.isEmpty()) {
                    String partitionPath = ((HoodieWriteStat)logFileWriteStats.get(0)).getPartitionPath();
                    List<String> currentLogFilePaths = logFileWriteStats.stream().map(writeStat -> new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()).toString()).collect(Collectors.toList());
                    List<String> allLogFilePaths = logFileWriteStats.stream().flatMap(writeStat -> {
                        ValidationUtils.checkState(writeStat instanceof HoodieDeltaWriteStat, "Log file should be associated with a delta write stat");
                        List currentLogFiles = ((HoodieDeltaWriteStat)writeStat).getLogFiles().stream().map(logFile -> new StoragePath(new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()), (String)logFile).toString()).collect(Collectors.toList());
                        return currentLogFiles.stream();
                    }).collect(Collectors.toList());
                    Pair<Set<String>, Set<String>> revivedAndDeletedKeys = HoodieTableMetadataUtil.getRevivedAndDeletedKeysFromMergedLogs(dataTableMetaClient, instantTime, engineType, allLogFilePaths, finalWriterSchemaOpt, currentLogFilePaths);
                    Set<String> revivedKeys = revivedAndDeletedKeys.getLeft();
                    Set<String> deletedKeys = revivedAndDeletedKeys.getRight();
                    List revivedRecords = revivedKeys.stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partitionPath, fileId, instantTime, writesFileIdEncoding)).collect(Collectors.toList());
                    List deletedRecords = deletedKeys.stream().map(HoodieMetadataPayload::createRecordIndexDelete).collect(Collectors.toList());
                    ArrayList allRecords = new ArrayList();
                    allRecords.addAll(revivedRecords);
                    allRecords.addAll(deletedRecords);
                    return allRecords.iterator();
                }
                LOG.warn("No base file or log file write stats found for fileId: {}", (Object)fileId);
                return Collections.emptyIterator();
            });
            long totalWriteBytesForRLI = allWriteStats.stream().mapToLong(writeStat -> {
                if (writeStat.getNumInserts() == 0L && writeStat.getNumDeletes() == 0L) {
                    return 0L;
                }
                return writeStat.getTotalWriteBytes();
            }).sum();
            long targetPartitionSize = 0x6400000L;
            parallelism = (int)Math.max(1L, (totalWriteBytesForRLI + targetPartitionSize - 1L) / targetPartitionSize);
            return HoodieTableMetadataUtil.reduceByKeys(recordIndexRecords, parallelism);
        }
        catch (Exception e) {
            throw new HoodieException("Failed to generate RLI records for metadata table", e);
        }
    }

    @VisibleForTesting
    public static Pair<Set<String>, Set<String>> getRevivedAndDeletedKeysFromMergedLogs(HoodieTableMetaClient dataTableMetaClient, String instantTime, EngineType engineType, List<String> logFilePaths, Option<Schema> finalWriterSchemaOpt, List<String> currentLogFilePaths) {
        List<String> logFilePathsWithoutCurrentLogFiles = logFilePaths.stream().filter(logFilePath -> !currentLogFilePaths.contains(logFilePath)).collect(Collectors.toList());
        if (logFilePathsWithoutCurrentLogFiles.isEmpty()) {
            Map<String, HoodieRecord> currentLogRecords = HoodieTableMetadataUtil.getLogRecords(currentLogFilePaths, dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
            Set deletedKeys = currentLogRecords.entrySet().stream().filter(entry -> HoodieTableMetadataUtil.isDeleteRecord(dataTableMetaClient, finalWriterSchemaOpt, (HoodieRecord)entry.getValue())).map(Map.Entry::getKey).collect(Collectors.toSet());
            return Pair.of(Collections.emptySet(), deletedKeys);
        }
        return HoodieTableMetadataUtil.getRevivedAndDeletedKeys(dataTableMetaClient, instantTime, engineType, logFilePaths, finalWriterSchemaOpt, logFilePathsWithoutCurrentLogFiles);
    }

    private static Pair<Set<String>, Set<String>> getRevivedAndDeletedKeys(HoodieTableMetaClient dataTableMetaClient, String instantTime, EngineType engineType, List<String> logFilePaths, Option<Schema> finalWriterSchemaOpt, List<String> logFilePathsWithoutCurrentLogFiles) {
        Map<String, HoodieRecord> allLogRecords = HoodieTableMetadataUtil.getLogRecords(logFilePaths, dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
        Map<String, HoodieRecord> previousLogRecords = HoodieTableMetadataUtil.getLogRecords(logFilePathsWithoutCurrentLogFiles, dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
        Map partitionedKeysForPreviousLogs = previousLogRecords.entrySet().stream().collect(Collectors.partitioningBy(entry -> !HoodieTableMetadataUtil.isDeleteRecord(dataTableMetaClient, finalWriterSchemaOpt, (HoodieRecord)entry.getValue()), Collectors.mapping(Map.Entry::getKey, Collectors.toSet())));
        Set<String> validKeysForPreviousLogs = partitionedKeysForPreviousLogs.get(true);
        Set<String> deletedKeysForPreviousLogs = partitionedKeysForPreviousLogs.get(false);
        Map partitionedKeysForAllLogs = allLogRecords.entrySet().stream().collect(Collectors.partitioningBy(entry -> !HoodieTableMetadataUtil.isDeleteRecord(dataTableMetaClient, finalWriterSchemaOpt, (HoodieRecord)entry.getValue()), Collectors.mapping(Map.Entry::getKey, Collectors.toSet())));
        Set<String> validKeysForAllLogs = partitionedKeysForAllLogs.get(true);
        Set<String> deletedKeysForAllLogs = partitionedKeysForAllLogs.get(false);
        return HoodieTableMetadataUtil.computeRevivedAndDeletedKeys(validKeysForPreviousLogs, deletedKeysForPreviousLogs, validKeysForAllLogs, deletedKeysForAllLogs);
    }

    private static boolean isDeleteRecord(HoodieTableMetaClient dataTableMetaClient, Option<Schema> finalWriterSchemaOpt, HoodieRecord record) {
        try {
            return record.isDelete(finalWriterSchemaOpt.get(), dataTableMetaClient.getTableConfig().getProps());
        }
        catch (IOException e) {
            throw new HoodieException("Failed to check if record is delete", e);
        }
    }

    private static Map<String, HoodieRecord> getLogRecords(List<String> logFilePaths, HoodieTableMetaClient datasetMetaClient, Option<Schema> writerSchemaOpt, String latestCommitTimestamp, EngineType engineType) {
        if (writerSchemaOpt.isPresent()) {
            StorageConfiguration<?> storageConf = datasetMetaClient.getStorageConf();
            HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(datasetMetaClient.getBasePath().toString(), engineType, Collections.emptyList(), datasetMetaClient.getTableConfig().getRecordMergeStrategyId());
            HoodieMergedLogRecordScanner mergedLogRecordScanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withStorage(datasetMetaClient.getStorage()).withBasePath(datasetMetaClient.getBasePath()).withLogFilePaths((List)logFilePaths)).withReaderSchema(writerSchemaOpt.get()).withLatestInstantTime(latestCommitTimestamp).withReverseReader(false).withMaxMemorySizeInBytes(storageConf.getLong(HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION.key(), 0x40000000L)).withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue()).withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()).withOptimizedLogBlocksScan(storageConf.getBoolean("hoodie.optimized.log.blocks.scan.enable", false)).withDiskMapType((ExternalSpillableMap.DiskMapType)storageConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), (Enum)HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())).withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())).withRecordMerger(recordMerger).withTableMetaClient(datasetMetaClient).withAllowInflightInstants(true).build();
            return mergedLogRecordScanner.getRecords();
        }
        return Collections.emptyMap();
    }

    @VisibleForTesting
    public static Pair<Set<String>, Set<String>> computeRevivedAndDeletedKeys(Set<String> validKeysForPreviousLogs, Set<String> deletedKeysForPreviousLogs, Set<String> validKeysForAllLogs, Set<String> deletedKeysForAllLogs) {
        HashSet<String> revivedKeys = new HashSet<String>(deletedKeysForPreviousLogs);
        revivedKeys.retainAll(validKeysForAllLogs);
        HashSet<String> deletedKeys = new HashSet<String>(validKeysForPreviousLogs);
        deletedKeys.retainAll(deletedKeysForAllLogs);
        return Pair.of(revivedKeys, deletedKeys);
    }

    @VisibleForTesting
    public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
        return recordIndexRecords.mapToPair(t -> Pair.of(t.getKey(), t)).reduceByKey((record1, record2) -> {
            boolean isRecord1Deleted = record1.getData() instanceof EmptyHoodieRecordPayload;
            boolean isRecord2Deleted = record2.getData() instanceof EmptyHoodieRecordPayload;
            if (isRecord1Deleted && !isRecord2Deleted) {
                return record2;
            }
            if (!isRecord1Deleted && isRecord2Deleted) {
                return record1;
            }
            if (isRecord1Deleted && isRecord2Deleted) {
                return record1;
            }
            throw new HoodieIOException("Two HoodieRecord updates to RLI is seen for same record key " + record2.getRecordKey() + ", record 1 : " + record1.getData().toString() + ", record 2 : " + record2.getData().toString());
        }, parallelism).values();
    }

    @VisibleForTesting
    public static Set<String> getRecordKeys(List<String> logFilePaths, HoodieTableMetaClient datasetMetaClient, Option<Schema> writerSchemaOpt, int maxBufferSize, String latestCommitTimestamp, boolean includeValidKeys, boolean includeDeletedKeys) throws IOException {
        if (writerSchemaOpt.isPresent()) {
            HashSet<String> allRecordKeys = new HashSet<String>();
            HoodieUnMergedLogRecordScanner.Builder builder = ((HoodieUnMergedLogRecordScanner.Builder)HoodieUnMergedLogRecordScanner.newBuilder().withStorage(datasetMetaClient.getStorage()).withBasePath(datasetMetaClient.getBasePath()).withLogFilePaths((List)logFilePaths)).withBufferSize(maxBufferSize).withLatestInstantTime(latestCommitTimestamp).withReaderSchema(writerSchemaOpt.get()).withTableMetaClient(datasetMetaClient);
            if (includeValidKeys) {
                builder.withLogRecordScannerCallback(record -> allRecordKeys.add(record.getRecordKey()));
            }
            if (includeDeletedKeys) {
                builder.withRecordDeletionCallback(deletedKey -> allRecordKeys.add(deletedKey.getRecordKey()));
            }
            HoodieUnMergedLogRecordScanner scanner = builder.build();
            scanner.scan();
            return allRecordKeys;
        }
        return Collections.emptySet();
    }

    public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext engineContext, HoodieTableMetaClient dataTableMetaClient, HoodieRollbackMetadata rollbackMetadata, String instantTime) {
        List<HoodieRecord> filesPartitionRecords = HoodieTableMetadataUtil.convertMetadataToRollbackRecords(rollbackMetadata, instantTime, dataTableMetaClient);
        HoodieData rollbackRecordsRDD = filesPartitionRecords.isEmpty() ? engineContext.emptyHoodieData() : engineContext.parallelize(filesPartitionRecords, filesPartitionRecords.size());
        return Collections.singletonMap(MetadataPartitionType.FILES.getPartitionPath(), rollbackRecordsRDD);
    }

    private static List<HoodieRecord> convertMetadataToRollbackRecords(HoodieRollbackMetadata rollbackMetadata, String instantTime, HoodieTableMetaClient dataTableMetaClient) {
        HashMap<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<String, Map<String, Long>>();
        HoodieTableMetadataUtil.processRollbackMetadata(rollbackMetadata, partitionToAppendedFiles);
        return HoodieTableMetadataUtil.convertFilesToFilesPartitionRecords(Collections.emptyMap(), partitionToAppendedFiles, instantTime, "Rollback");
    }

    private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, Map<String, Map<String, Long>> partitionToAppendedFiles) {
        rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
            boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && !pm.getRollbackLogFiles().isEmpty();
            String partition = pm.getPartitionPath();
            String partitionId = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition);
            BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> oldSize > newSizeCopy ? oldSize : newSizeCopy;
            if (hasRollbackLogFiles) {
                if (!partitionToAppendedFiles.containsKey(partitionId)) {
                    partitionToAppendedFiles.put(partitionId, new HashMap());
                }
                pm.getRollbackLogFiles().forEach((path, size) -> {
                    String fileName = new StoragePath((String)path).getName();
                    ((Map)partitionToAppendedFiles.get(partitionId)).merge(fileName, size, fileMergeFn);
                });
                pm.getLogFilesFromFailedCommit().forEach((path, size) -> {
                    String fileName = new StoragePath((String)path).getName();
                    ((Map)partitionToAppendedFiles.get(partitionId)).merge(fileName, size, fileMergeFn);
                });
            }
        });
    }

    protected static List<HoodieRecord> convertFilesToFilesPartitionRecords(Map<String, List<String>> partitionToDeletedFiles, Map<String, Map<String, Long>> partitionToAppendedFiles, String instantTime, String operation) {
        ArrayList<HoodieRecord> records = new ArrayList<HoodieRecord>(partitionToDeletedFiles.size() + partitionToAppendedFiles.size());
        int[] fileChangeCount = new int[]{0, 0};
        partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> {
            fileChangeCount[0] = fileChangeCount[0] + deletedFiles.size();
            Map filesAdded = Collections.emptyMap();
            if (partitionToAppendedFiles.containsKey(partitionName)) {
                filesAdded = (Map)partitionToAppendedFiles.remove(partitionName);
            }
            HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createPartitionFilesRecord(partitionName, filesAdded, deletedFiles);
            records.add(record);
        });
        partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
            String partition = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partitionName);
            fileChangeCount[1] = fileChangeCount[1] + appendedFileMap.size();
            ValidationUtils.checkState(!appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())), "Rollback file cannot both be appended and deleted");
            HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createPartitionFilesRecord(partition, appendedFileMap, Collections.emptyList());
            records.add(record);
        });
        LOG.info("Found at {} from {}. #partitions_updated={}, #files_deleted={}, #files_appended={}", new Object[]{instantTime, operation, records.size(), fileChangeCount[0], fileChangeCount[1]});
        return records;
    }

    public static String getColumnStatsIndexPartitionIdentifier(String partitionName) {
        return HoodieTableMetadataUtil.getPartitionIdentifier(partitionName);
    }

    public static String getBloomFilterIndexPartitionIdentifier(String partitionName) {
        return HoodieTableMetadataUtil.getPartitionIdentifier(partitionName);
    }

    public static String getPartitionIdentifierForFilesPartition(String relativePartitionPath) {
        return HoodieTableMetadataUtil.getPartitionIdentifier(relativePartitionPath);
    }

    public static String getPartitionIdentifier(@Nonnull String relativePartitionPath) {
        return "".equals(relativePartitionPath) ? "." : relativePartitionPath;
    }

    public static HoodieData<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext, Map<String, List<String>> partitionToDeletedFiles, Map<String, Map<String, Long>> partitionToAppendedFiles, String instantTime, HoodieTableMetaClient dataMetaClient, int bloomIndexParallelism, String bloomFilterType) {
        List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = HoodieTableMetadataUtil.fetchPartitionFileInfoTriplets(partitionToDeletedFiles, partitionToAppendedFiles);
        int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), bloomIndexParallelism), 1);
        return engineContext.parallelize(partitionFileFlagTupleList, parallelism).flatMap(partitionFileFlagTuple -> {
            String partitionName = (String)partitionFileFlagTuple.f0;
            String filename = (String)partitionFileFlagTuple.f1;
            boolean isDeleted = (Boolean)partitionFileFlagTuple.f2;
            if (!FSUtils.isBaseFile(new StoragePath(filename))) {
                LOG.warn("Ignoring file {} as it is not a base file", (Object)filename);
                return Stream.empty().iterator();
            }
            ByteBuffer bloomFilterBuffer = ByteBuffer.allocate(0);
            if (!isDeleted) {
                String pathWithPartition = partitionName + "/" + filename;
                StoragePath addedFilePath = new StoragePath(dataMetaClient.getBasePath(), pathWithPartition);
                bloomFilterBuffer = HoodieTableMetadataUtil.readBloomFilter(dataMetaClient.getStorage(), addedFilePath);
                if (bloomFilterBuffer == null) {
                    LOG.error("Failed to read bloom filter from {}", (Object)addedFilePath);
                    return Stream.empty().iterator();
                }
            }
            return Stream.of(HoodieMetadataPayload.createBloomFilterMetadataRecord(partitionName, filename, instantTime, bloomFilterType, bloomFilterBuffer, (Boolean)partitionFileFlagTuple.f2)).iterator();
        });
    }

    public static HoodieData<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEngineContext engineContext, Map<String, List<String>> partitionToDeletedFiles, Map<String, Map<String, Long>> partitionToAppendedFiles, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig metadataConfig, int columnStatsIndexParallelism, int maxReaderBufferSize, List<String> columnsToIndex) {
        if (partitionToAppendedFiles.isEmpty() && partitionToDeletedFiles.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        LOG.info("Indexing {} columns for column stats index", (Object)columnsToIndex.size());
        List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = HoodieTableMetadataUtil.fetchPartitionFileInfoTriplets(partitionToDeletedFiles, partitionToAppendedFiles);
        int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), columnStatsIndexParallelism), 1);
        return engineContext.parallelize(partitionFileFlagTupleList, parallelism).flatMap(partitionFileFlagTuple -> {
            String partitionPath = (String)partitionFileFlagTuple.f0;
            String filename = (String)partitionFileFlagTuple.f1;
            boolean isDeleted = (Boolean)partitionFileFlagTuple.f2;
            return HoodieTableMetadataUtil.getColumnStatsRecords(partitionPath, filename, dataMetaClient, columnsToIndex, isDeleted, maxReaderBufferSize).iterator();
        });
    }

    private static ByteBuffer readBloomFilter(HoodieStorage storage2, StoragePath filePath) throws IOException {
        HoodieConfig hoodieConfig = ConfigUtils.getReaderConfigs(storage2.getConf());
        try (HoodieFileReader fileReader = HoodieIOFactory.getIOFactory(storage2).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hoodieConfig, filePath);){
            BloomFilter fileBloomFilter = fileReader.readBloomFilter();
            if (fileBloomFilter == null) {
                ByteBuffer byteBuffer = null;
                return byteBuffer;
            }
            ByteBuffer byteBuffer = ByteBuffer.wrap(StringUtils.getUTF8Bytes(fileBloomFilter.serializeToString()));
            return byteBuffer;
        }
    }

    private static List<Tuple3<String, String, Boolean>> fetchPartitionFileInfoTriplets(Map<String, List<String>> partitionToDeletedFiles, Map<String, Map<String, Long>> partitionToAppendedFiles) {
        int totalFiles = partitionToDeletedFiles.values().stream().mapToInt(List::size).sum() + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
        ArrayList<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = new ArrayList<Tuple3<String, String, Boolean>>(totalFiles);
        partitionToDeletedFiles.entrySet().stream().flatMap(entry -> ((List)entry.getValue()).stream().map(deletedFile -> Tuple3.of(entry.getKey(), deletedFile, true))).collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
        partitionToAppendedFiles.entrySet().stream().flatMap(entry -> ((Map)entry.getValue()).keySet().stream().map(addedFile -> Tuple3.of(entry.getKey(), addedFile, false))).collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
        return partitionFileFlagTupleList;
    }

    public static int mapRecordKeyToFileGroupIndex(String recordKey, int numFileGroups) {
        int h = 0;
        for (int i = 0; i < recordKey.length(); ++i) {
            h = 31 * h + recordKey.charAt(i);
        }
        return Math.abs(Math.abs(h) % numFileGroups);
    }

    public static List<FileSlice> getPartitionLatestMergedFileSlices(HoodieTableMetaClient metaClient, HoodieTableFileSystemView fsView, String partition) {
        LOG.info("Loading latest merged file slices for metadata table partition {}", (Object)partition);
        return HoodieTableMetadataUtil.getPartitionFileSlices(metaClient, Option.of(fsView), partition, true);
    }

    public static List<FileSlice> getPartitionLatestFileSlices(HoodieTableMetaClient metaClient, Option<HoodieTableFileSystemView> fsView, String partition) {
        LOG.info("Loading latest file slices for metadata table partition {}", (Object)partition);
        return HoodieTableMetadataUtil.getPartitionFileSlices(metaClient, fsView, partition, false);
    }

    public static HoodieTableFileSystemView getFileSystemViewForMetadataTable(HoodieTableMetaClient metaClient) {
        HoodieTimeline timeline = metaClient.getActiveTimeline();
        TimelineFactory factory = metaClient.getTimelineLayout().getTimelineFactory();
        if (timeline.empty()) {
            HoodieInstant instant = metaClient.createNewInstant(HoodieInstant.State.COMPLETED, "deltacommit", metaClient.createNewInstantTime(false));
            timeline = factory.createDefaultTimeline(Stream.of(instant), metaClient.getActiveTimeline());
        }
        HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf());
        return HoodieTableFileSystemView.fileListingBasedFileSystemView(engineContext, metaClient, timeline);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static List<FileSlice> getPartitionFileSlices(HoodieTableMetaClient metaClient, Option<HoodieTableFileSystemView> fileSystemView, String partition, boolean mergeFileSlices) {
        HoodieTableFileSystemView fsView = null;
        try {
            Stream<FileSlice> fileSliceStream;
            fsView = fileSystemView.orElseGet(() -> HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metaClient));
            if (mergeFileSlices) {
                if (!metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) {
                    List<FileSlice> list = Collections.emptyList();
                    return list;
                }
                fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().get().requestedTime());
            } else {
                fileSliceStream = fsView.getLatestFileSlices(partition);
            }
            List<FileSlice> list = fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList());
            return list;
        }
        finally {
            if (!fileSystemView.isPresent()) {
                fsView.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static List<FileSlice> getPartitionLatestFileSlicesIncludingInflight(HoodieTableMetaClient metaClient, Option<HoodieTableFileSystemView> fileSystemView, String partition) {
        HoodieTableFileSystemView fsView = null;
        try {
            fsView = fileSystemView.orElseGet(() -> HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metaClient));
            Stream<FileSlice> fileSliceStream = fsView.getLatestFileSlicesIncludingInflight(partition);
            List<FileSlice> list = fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList());
            return list;
        }
        finally {
            if (!fileSystemView.isPresent() && fsView != null) {
                fsView.close();
            }
        }
    }

    public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig metadataConfig, Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
        List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
        if (allWriteStats.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        try {
            Map<String, Schema> columnsToIndexSchemaMap = HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, dataMetaClient, metadataConfig, recordTypeOpt);
            if (columnsToIndexSchemaMap.isEmpty()) {
                return engineContext.emptyHoodieData();
            }
            ArrayList<String> columnsToIndex = new ArrayList<String>(columnsToIndexSchemaMap.keySet());
            int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getColumnStatsIndexParallelism()), 1);
            return engineContext.parallelize(allWriteStats, parallelism).flatMap(writeStat -> HoodieTableMetadataUtil.translateWriteStatToColumnStats(writeStat, dataMetaClient, columnsToIndex).iterator());
        }
        catch (Exception e) {
            throw new HoodieException("Failed to generate column stats records for metadata table", e);
        }
    }

    public static Map<String, Schema> getColumnsToIndex(HoodieCommitMetadata commitMetadata, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig metadataConfig, Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
        Option<Schema> writerSchema = Option.ofNullable(commitMetadata.getMetadata("schema")).flatMap(writerSchemaStr -> StringUtils.isNullOrEmpty(writerSchemaStr) ? Option.empty() : Option.of(new Schema.Parser().parse(writerSchemaStr)));
        HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
        Option<Schema> tableSchema = writerSchema.isEmpty() ? tableConfig.getTableCreateSchema() : writerSchema.map(schema -> tableConfig.populateMetaFields() ? HoodieAvroUtils.addMetadataFields(schema) : schema);
        return HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(tableSchema), false, recordTypeOpt);
    }

    @VisibleForTesting
    public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig tableConfig, HoodieMetadataConfig metadataConfig, Lazy<Option<Schema>> tableSchemaLazyOpt, Option<HoodieRecord.HoodieRecordType> recordType) {
        return HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, tableSchemaLazyOpt, false, recordType);
    }

    @VisibleForTesting
    public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig tableConfig, HoodieMetadataConfig metadataConfig, Lazy<Option<Schema>> tableSchemaLazyOpt, boolean isTableInitializing) {
        return HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, tableSchemaLazyOpt, isTableInitializing, Option.empty());
    }

    @VisibleForTesting
    public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig tableConfig, HoodieMetadataConfig metadataConfig, Lazy<Option<Schema>> tableSchemaLazyOpt, boolean isTableInitializing, Option<HoodieRecord.HoodieRecordType> recordType) {
        Map<String, Schema> columnsToIndexWithoutRequiredMetas = HoodieTableMetadataUtil.getColumnsToIndexWithoutRequiredMetaFields(metadataConfig, tableSchemaLazyOpt, isTableInitializing, recordType);
        if (!tableConfig.populateMetaFields()) {
            return columnsToIndexWithoutRequiredMetas;
        }
        LinkedHashMap<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<String, Schema>();
        colsToIndexSchemaMap.putAll(META_COLS_TO_ALWAYS_INDEX_SCHEMA_MAP);
        colsToIndexSchemaMap.putAll(columnsToIndexWithoutRequiredMetas);
        return colsToIndexSchemaMap;
    }

    private static Map<String, Schema> getColumnsToIndexWithoutRequiredMetaFields(HoodieMetadataConfig metadataConfig, Lazy<Option<Schema>> tableSchemaLazyOpt, boolean isTableInitializing, Option<HoodieRecord.HoodieRecordType> recordType) {
        List<String> columnsToIndex = metadataConfig.getColumnsEnabledForColumnStatsIndex();
        if (!columnsToIndex.isEmpty()) {
            if (isTableInitializing) {
                LinkedHashMap<String, Schema> toReturn = new LinkedHashMap<String, Schema>();
                columnsToIndex.forEach(colName -> {
                    Schema cfr_ignored_0 = toReturn.put((String)colName, (Schema)null);
                });
                return toReturn;
            }
            ValidationUtils.checkArgument(tableSchemaLazyOpt.get().isPresent(), "Table schema not found for the table while computing col stats");
            Option<Schema> tableSchema = tableSchemaLazyOpt.get();
            LinkedHashMap<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<String, Schema>();
            columnsToIndex.stream().filter(fieldName -> !META_COL_SET_TO_INDEX.contains(fieldName)).map(colName -> Pair.of(colName, HoodieAvroUtils.getSchemaForField((Schema)tableSchema.get(), colName).getRight().schema())).filter(fieldNameSchemaPair -> HoodieTableMetadataUtil.isColumnTypeSupported((Schema)fieldNameSchemaPair.getValue(), recordType)).forEach(entry -> {
                Schema cfr_ignored_0 = (Schema)colsToIndexSchemaMap.put((String)entry.getKey(), (Schema)entry.getValue());
            });
            return colsToIndexSchemaMap;
        }
        if (tableSchemaLazyOpt.get().isPresent()) {
            LinkedHashMap<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<String, Schema>();
            tableSchemaLazyOpt.get().map(schema -> HoodieTableMetadataUtil.getFirstNSupportedFields(schema, metadataConfig.maxColumnsToIndexForColStats(), recordType)).orElse(Stream.empty()).forEach(entry -> {
                Schema cfr_ignored_0 = (Schema)colsToIndexSchemaMap.put((String)entry.getKey(), (Schema)entry.getValue());
            });
            return colsToIndexSchemaMap;
        }
        return Collections.emptyMap();
    }

    private static Stream<Pair<String, Schema>> getFirstNSupportedFields(Schema tableSchema, int n, Option<HoodieRecord.HoodieRecordType> recordType) {
        return HoodieTableMetadataUtil.getFirstNFields(tableSchema.getFields().stream().filter(field2 -> HoodieTableMetadataUtil.isColumnTypeSupported(field2.schema(), recordType)).map(field2 -> Pair.of(field2.name(), field2.schema())), n);
    }

    private static Stream<Pair<String, Schema>> getFirstNFields(Stream<Pair<String, Schema>> fieldSchemaPairStream, int n) {
        return fieldSchemaPairStream.filter(fieldSchemaPair -> !HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldSchemaPair.getKey())).limit(n);
    }

    private static Stream<HoodieRecord> translateWriteStatToColumnStats(HoodieWriteStat writeStat, HoodieTableMetaClient datasetMetaClient, List<String> columnsToIndex) {
        if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat)writeStat).getColumnStats().isPresent()) {
            Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = ((HoodieDeltaWriteStat)writeStat).getColumnStats().get();
            Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = columnRangeMap.values();
            return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false);
        }
        String filePath = writeStat.getPath();
        return HoodieTableMetadataUtil.getColumnStatsRecords(writeStat.getPartitionPath(), FSUtils.getFileNameFromPath(filePath), datasetMetaClient, columnsToIndex, false);
    }

    private static Stream<HoodieRecord> getColumnStatsRecords(String partitionPath, String fileName, HoodieTableMetaClient datasetMetaClient, List<String> columnsToIndex, boolean isDeleted) {
        return HoodieTableMetadataUtil.getColumnStatsRecords(partitionPath, fileName, datasetMetaClient, columnsToIndex, isDeleted, -1);
    }

    private static Stream<HoodieRecord> getColumnStatsRecords(String partitionPath, String fileName, HoodieTableMetaClient datasetMetaClient, List<String> columnsToIndex, boolean isDeleted, int maxBufferSize) {
        if (isDeleted) {
            List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = columnsToIndex.stream().map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry)).collect(Collectors.toList());
            return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, true);
        }
        List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata = HoodieTableMetadataUtil.readColumnRangeMetadataFrom(partitionPath, fileName, datasetMetaClient, columnsToIndex, maxBufferSize);
        return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadata, false);
    }

    private static List<HoodieColumnRangeMetadata<Comparable>> readColumnRangeMetadataFrom(String partitionPath, String fileName, HoodieTableMetaClient datasetMetaClient, List<String> columnsToIndex, int maxBufferSize) {
        String partitionPathFileName = partitionPath.equals("") || partitionPath.equals(".") ? fileName : partitionPath + "/" + fileName;
        try {
            StoragePath fullFilePath = new StoragePath(datasetMetaClient.getBasePath(), partitionPathFileName);
            if (partitionPathFileName.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
                return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET).readColumnStatsFromMetadata(datasetMetaClient.getStorage(), fullFilePath, columnsToIndex);
            }
            if (FSUtils.isLogFile(fileName)) {
                Option<Schema> writerSchemaOpt = HoodieTableMetadataUtil.tryResolveSchemaForTable(datasetMetaClient);
                LOG.warn("Reading log file: {}, to build column range metadata.", (Object)partitionPathFileName);
                return HoodieTableMetadataUtil.getLogFileColumnRangeMetadata(fullFilePath.toString(), datasetMetaClient, columnsToIndex, writerSchemaOpt, maxBufferSize);
            }
            LOG.warn("Column range index not supported for: {}", (Object)partitionPathFileName);
            return Collections.emptyList();
        }
        catch (Exception e) {
            LOG.error("Failed to fetch column range metadata for: {}", (Object)partitionPathFileName);
            return Collections.emptyList();
        }
    }

    @VisibleForTesting
    public static List<HoodieColumnRangeMetadata<Comparable>> getLogFileColumnRangeMetadata(String filePath, HoodieTableMetaClient datasetMetaClient, List<String> columnsToIndex, Option<Schema> writerSchemaOpt, int maxBufferSize) throws IOException {
        if (writerSchemaOpt.isPresent()) {
            List<Pair<String, Schema.Field>> fieldsToIndex = columnsToIndex.stream().map(fieldName -> HoodieAvroUtils.getSchemaForField((Schema)writerSchemaOpt.get(), fieldName)).collect(Collectors.toList());
            ArrayList<HoodieRecord> records = new ArrayList<HoodieRecord>();
            HoodieUnMergedLogRecordScanner scanner = ((HoodieUnMergedLogRecordScanner.Builder)HoodieUnMergedLogRecordScanner.newBuilder().withStorage(datasetMetaClient.getStorage()).withBasePath(datasetMetaClient.getBasePath()).withLogFilePaths((List)Collections.singletonList(filePath))).withBufferSize(maxBufferSize).withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()).withReaderSchema(writerSchemaOpt.get()).withTableMetaClient(datasetMetaClient).withLogRecordScannerCallback(records::add).build();
            scanner.scan();
            if (records.isEmpty()) {
                return Collections.emptyList();
            }
            Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataMap = HoodieTableMetadataUtil.collectColumnRangeMetadata(records, fieldsToIndex, FSUtils.getFileNameFromPath(filePath), writerSchemaOpt.get());
            return new ArrayList<HoodieColumnRangeMetadata<Comparable>>(columnRangeMetadataMap.values());
        }
        return Collections.emptyList();
    }

    public static BigDecimal tryUpcastDecimal(BigDecimal value, LogicalTypes.Decimal decimal) {
        int scale = decimal.getScale();
        int valueScale = value.scale();
        boolean scaleAdjusted = false;
        if (valueScale != scale) {
            try {
                value = value.setScale(scale, RoundingMode.UNNECESSARY);
                scaleAdjusted = true;
            }
            catch (ArithmeticException aex) {
                throw new AvroTypeException("Cannot encode decimal with scale " + valueScale + " as scale " + scale + " without rounding");
            }
        }
        int precision = decimal.getPrecision();
        int valuePrecision = value.precision();
        if (valuePrecision > precision) {
            if (scaleAdjusted) {
                throw new AvroTypeException("Cannot encode decimal with precision " + valuePrecision + " as max precision " + precision + ". This is after safely adjusting scale from " + valueScale + " to required " + scale);
            }
            throw new AvroTypeException("Cannot encode decimal with precision " + valuePrecision + " as max precision " + precision);
        }
        return value;
    }

    public static Option<Schema> tryResolveSchemaForTable(HoodieTableMetaClient dataTableMetaClient) {
        if (dataTableMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
            return Option.empty();
        }
        try {
            TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient);
            return Option.of(schemaResolver.getTableAvroSchema());
        }
        catch (Exception e) {
            throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e);
        }
    }

    public static Comparable<?> coerceToComparable(Schema schema, Object val) {
        if (val == null) {
            return null;
        }
        switch (schema.getType()) {
            case UNION: {
                return HoodieTableMetadataUtil.coerceToComparable(AvroSchemaUtils.resolveNullableSchema(schema), val);
            }
            case FIXED: 
            case BYTES: {
                if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
                    return (Comparable)val;
                }
                return (ByteBuffer)val;
            }
            case INT: {
                if (schema.getLogicalType() == LogicalTypes.date() || schema.getLogicalType() == LogicalTypes.timeMillis()) {
                    return (Comparable)val;
                }
                return HoodieTableMetadataUtil.castToInteger(val);
            }
            case LONG: {
                if (schema.getLogicalType() == LogicalTypes.timeMicros() || schema.getLogicalType() == LogicalTypes.timestampMicros() || schema.getLogicalType() == LogicalTypes.timestampMillis()) {
                    return (Comparable)val;
                }
                return HoodieTableMetadataUtil.castToLong(val);
            }
            case STRING: {
                return val.toString();
            }
            case FLOAT: {
                return HoodieTableMetadataUtil.castToFloat(val);
            }
            case DOUBLE: {
                return HoodieTableMetadataUtil.castToDouble(val);
            }
            case BOOLEAN: {
                return (Comparable)val;
            }
            case ENUM: 
            case MAP: 
            case NULL: 
            case RECORD: 
            case ARRAY: {
                return null;
            }
        }
        throw new IllegalStateException("Unexpected type: " + schema.getType());
    }

    private static Integer castToInteger(Object val) {
        if (val == null) {
            return null;
        }
        if (val instanceof Integer) {
            return (Integer)val;
        }
        if (val instanceof Long) {
            return ((Long)val).intValue();
        }
        if (val instanceof Float) {
            return ((Float)val).intValue();
        }
        if (val instanceof Double) {
            return ((Double)val).intValue();
        }
        if (val instanceof Boolean) {
            return (Boolean)val != false ? 1 : 0;
        }
        return Integer.parseInt(val.toString());
    }

    private static Long castToLong(Object val) {
        if (val == null) {
            return null;
        }
        if (val instanceof Integer) {
            return ((Integer)val).longValue();
        }
        if (val instanceof Long) {
            return (Long)val;
        }
        if (val instanceof Float) {
            return ((Float)val).longValue();
        }
        if (val instanceof Double) {
            return ((Double)val).longValue();
        }
        if (val instanceof Boolean) {
            return (Boolean)val != false ? 1L : 0L;
        }
        return Long.parseLong(val.toString());
    }

    private static Float castToFloat(Object val) {
        if (val == null) {
            return null;
        }
        if (val instanceof Integer) {
            return Float.valueOf(((Integer)val).floatValue());
        }
        if (val instanceof Long) {
            return Float.valueOf(((Long)val).floatValue());
        }
        if (val instanceof Float) {
            return Float.valueOf(((Float)val).floatValue());
        }
        if (val instanceof Double) {
            return Float.valueOf(((Double)val).floatValue());
        }
        if (val instanceof Boolean) {
            return Float.valueOf((Boolean)val != false ? 1.0f : 0.0f);
        }
        return Float.valueOf(Float.parseFloat(val.toString()));
    }

    private static Double castToDouble(Object val) {
        if (val == null) {
            return null;
        }
        if (val instanceof Integer) {
            return ((Integer)val).doubleValue();
        }
        if (val instanceof Long) {
            return ((Long)val).doubleValue();
        }
        if (val instanceof Float) {
            return ((Float)val).doubleValue();
        }
        if (val instanceof Double) {
            return (double)((Double)val);
        }
        if (val instanceof Boolean) {
            return (Boolean)val != false ? 1.0 : 0.0;
        }
        return Double.parseDouble(val.toString());
    }

    public static boolean isColumnTypeSupported(Schema schema, Option<HoodieRecord.HoodieRecordType> recordType) {
        LogicalTypes.Decimal decimalType;
        Schema schemaToCheck = AvroSchemaUtils.resolveNullableSchema(schema);
        LogicalType logicalType = schemaToCheck.getLogicalType();
        if (logicalType != null && logicalType instanceof LogicalTypes.Decimal && ((decimalType = (LogicalTypes.Decimal)logicalType).getPrecision() + (15 - decimalType.getScale()) > 30 || decimalType.getScale() > 15)) {
            return false;
        }
        if (recordType.isPresent() && recordType.get() == HoodieRecord.HoodieRecordType.AVRO) {
            return schemaToCheck.getType() != Schema.Type.RECORD && schemaToCheck.getType() != Schema.Type.ARRAY && schemaToCheck.getType() != Schema.Type.MAP && schemaToCheck.getType() != Schema.Type.ENUM;
        }
        return schemaToCheck.getType() != Schema.Type.RECORD && schemaToCheck.getType() != Schema.Type.ARRAY && schemaToCheck.getType() != Schema.Type.MAP && schemaToCheck.getType() != Schema.Type.ENUM && schemaToCheck.getType() != Schema.Type.BYTES && schemaToCheck.getType() != Schema.Type.FIXED;
    }

    public static Set<String> getInflightMetadataPartitions(HoodieTableConfig tableConfig) {
        return new HashSet<String>(tableConfig.getMetadataPartitionsInflight());
    }

    public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableConfig tableConfig) {
        Set<String> inflightAndCompletedPartitions = HoodieTableMetadataUtil.getInflightMetadataPartitions(tableConfig);
        inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions());
        return inflightAndCompletedPartitions;
    }

    public static Set<String> getValidInstantTimestamps(HoodieTableMetaClient dataMetaClient, HoodieTableMetaClient metadataMetaClient) {
        HoodieActiveTimeline datasetTimeline = dataMetaClient.getActiveTimeline();
        Set datasetPendingInstants = datasetTimeline.filterInflightsAndRequested().getInstantsAsStream().map(HoodieInstant::requestedTime).collect(Collectors.toSet());
        Set<String> validInstantTimestamps = datasetTimeline.filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::requestedTime).collect(Collectors.toSet());
        validInstantTimestamps.addAll(metadataMetaClient.getActiveTimeline().filter(instant -> instant.isCompleted() && HoodieTableMetadataUtil.isValidInstant(datasetPendingInstants, instant)).getInstantsAsStream().map(HoodieInstant::requestedTime).collect(Collectors.toList()));
        String earliestInstantTime = validInstantTimestamps.isEmpty() ? "00000000000000" : (String)Collections.min(validInstantTimestamps);
        datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream().filter(instant -> InstantComparison.compareTimestamps(instant.requestedTime(), InstantComparison.GREATER_THAN, earliestInstantTime)).forEach(instant -> validInstantTimestamps.addAll(HoodieTableMetadataUtil.getRollbackedCommits(instant, datasetTimeline, dataMetaClient.getInstantGenerator())));
        metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants().filter(instant -> instant.getAction().equals("restore") || instant.getAction().equals("rollback")).getInstants().forEach(instant -> validInstantTimestamps.add(instant.requestedTime()));
        metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().filter(instant -> instant.requestedTime().startsWith("00000000000000")).getInstants().forEach(instant -> validInstantTimestamps.add(instant.requestedTime()));
        return validInstantTimestamps;
    }

    private static boolean isValidInstant(Set<String> datasetPendingInstants, HoodieInstant instant) {
        return instant.getAction().equals("deltacommit") && !datasetPendingInstants.contains(instant.requestedTime());
    }

    public static boolean isIndexingCommit(HoodieTimeline dataIndexTimeline, String instant) {
        return dataIndexTimeline.containsInstant(instant);
    }

    private static List<String> getRollbackedCommits(HoodieInstant instant, HoodieActiveTimeline timeline, InstantGenerator factory) {
        try {
            if (instant.getAction().equals("rollback")) {
                List<String> commitsToRollback;
                try {
                    HoodieRollbackMetadata rollbackMetadata = timeline.readRollbackMetadata(instant);
                    commitsToRollback = rollbackMetadata.getCommitsRollback();
                }
                catch (IOException e) {
                    HoodieRollbackPlan rollbackPlan = timeline.readRollbackPlan(factory.createNewInstant(HoodieInstant.State.REQUESTED, "rollback", instant.requestedTime()));
                    commitsToRollback = Collections.singletonList(rollbackPlan.getInstantToRollback().getCommitTime());
                    LOG.warn("Had to fetch rollback info from requested instant since completed file is empty {}", (Object)instant);
                }
                return commitsToRollback;
            }
            LinkedList<String> rollbackedCommits = new LinkedList<String>();
            if (instant.getAction().equals("restore")) {
                HoodieRestoreMetadata restoreMetadata = timeline.readRestoreMetadata(instant);
                restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback())));
            }
            return rollbackedCommits;
        }
        catch (IOException e) {
            throw new HoodieMetadataException("Error retrieving rollback commits for instant " + instant, e);
        }
    }

    public static String deleteMetadataTable(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, boolean backup) {
        StoragePath metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath());
        HoodieStorage storage2 = dataMetaClient.getStorage();
        dataMetaClient.getTableConfig().clearMetadataPartitions(dataMetaClient);
        try {
            if (!storage2.exists(metadataTablePath)) {
                return null;
            }
        }
        catch (FileNotFoundException e) {
            return null;
        }
        catch (IOException e) {
            throw new HoodieMetadataException("Failed to check metadata table existence", e);
        }
        if (backup) {
            StoragePath metadataBackupPath = new StoragePath(metadataTablePath.getParent(), ".metadata_" + dataMetaClient.createNewInstantTime(false));
            LOG.info("Backing up metadata directory to {} before deletion", (Object)metadataBackupPath);
            try {
                if (storage2.rename(metadataTablePath, metadataBackupPath)) {
                    return metadataBackupPath.toString();
                }
            }
            catch (Exception e) {
                LOG.error("Failed to backup metadata table using rename", (Throwable)e);
            }
        }
        LOG.info("Deleting metadata table from {}", (Object)metadataTablePath);
        try {
            storage2.deleteDirectory(metadataTablePath);
        }
        catch (Exception e) {
            throw new HoodieMetadataException("Failed to delete metadata table from path " + metadataTablePath, e);
        }
        return null;
    }

    public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, String partitionPath, boolean backup) {
        if (partitionPath.equals(MetadataPartitionType.FILES.getPartitionPath())) {
            return HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, context, backup);
        }
        StoragePath metadataTablePartitionPath = new StoragePath(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()), partitionPath);
        HoodieStorage storage2 = dataMetaClient.getStorage();
        dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, false);
        try {
            if (!storage2.exists(metadataTablePartitionPath)) {
                return null;
            }
        }
        catch (FileNotFoundException e) {
            LOG.debug("Metadata table partition {} not found at path {}", (Object)partitionPath, (Object)metadataTablePartitionPath);
            return null;
        }
        catch (Exception e) {
            throw new HoodieMetadataException(String.format("Failed to check existence of MDT partition %s at path %s: ", partitionPath, metadataTablePartitionPath), e);
        }
        if (backup) {
            StoragePath metadataPartitionBackupPath = new StoragePath(metadataTablePartitionPath.getParent().getParent(), String.format(".metadata_%s_%s", partitionPath, dataMetaClient.createNewInstantTime(false)));
            LOG.info("Backing up MDT partition {} to {} before deletion", (Object)partitionPath, (Object)metadataPartitionBackupPath);
            try {
                if (storage2.rename(metadataTablePartitionPath, metadataPartitionBackupPath)) {
                    return metadataPartitionBackupPath.toString();
                }
            }
            catch (Exception e) {
                LOG.error(String.format("Failed to backup MDT partition %s using rename", partitionPath), (Throwable)e);
            }
        } else {
            LOG.info("Deleting metadata table partition from {}", (Object)metadataTablePartitionPath);
            try {
                storage2.deleteDirectory(metadataTablePartitionPath);
            }
            catch (Exception e) {
                throw new HoodieMetadataException("Failed to delete metadata table partition from path " + metadataTablePartitionPath, e);
            }
        }
        return null;
    }

    public static String getFileIDForFileGroup(MetadataPartitionType partitionType, int index, String partitionName) {
        if (MetadataPartitionType.EXPRESSION_INDEX.equals((Object)partitionType) || MetadataPartitionType.SECONDARY_INDEX.equals((Object)partitionType)) {
            return String.format("%s%04d-%d", partitionName.replaceAll("_", "-").concat("-"), index, 0);
        }
        return String.format("%s%04d-%d", partitionType.getFileIdPrefix(), index, 0);
    }

    public static int getFileGroupIndexFromFileId(String fileId) {
        int endIndex = HoodieTableMetadataUtil.getFileIdLengthWithoutFileIndex(fileId);
        int fromIndex = fileId.lastIndexOf("-", endIndex - 1);
        return Integer.parseInt(fileId.substring(fromIndex + 1, endIndex));
    }

    public static String getFileGroupPrefix(String fileId) {
        return fileId.substring(0, HoodieTableMetadataUtil.getFileIdLengthWithoutFileIndex(fileId));
    }

    private static int getFileIdLengthWithoutFileIndex(String fileId) {
        return fileId.endsWith("-0") ? fileId.length() - 2 : fileId.length();
    }

    public static int estimateFileGroupCount(MetadataPartitionType partitionType, long recordCount, int averageRecordSize, int minFileGroupCount, int maxFileGroupCount, float growthFactor, int maxFileGroupSizeBytes) {
        long maxRecordsPerFileGroup;
        long expectedNumRecords;
        long estimatedFileGroupCount;
        int fileGroupCount = minFileGroupCount == maxFileGroupCount && minFileGroupCount != 0 ? minFileGroupCount : ((estimatedFileGroupCount = (expectedNumRecords = (long)Math.ceil((float)recordCount * growthFactor)) / (maxRecordsPerFileGroup = (long)maxFileGroupSizeBytes / Math.max((long)averageRecordSize, 1L))) >= (long)maxFileGroupCount ? maxFileGroupCount : (estimatedFileGroupCount <= (long)minFileGroupCount ? minFileGroupCount : Math.max(1, (int)estimatedFileGroupCount)));
        LOG.info("Estimated file group count for MDT partition {} is {} [recordCount={}, avgRecordSize={}, minFileGroupCount={}, maxFileGroupCount={}, growthFactor={}, maxFileGroupSizeBytes={}]", new Object[]{partitionType.name(), fileGroupCount, recordCount, averageRecordSize, minFileGroupCount, maxFileGroupCount, Float.valueOf(growthFactor), maxFileGroupSizeBytes});
        return fileGroupCount;
    }

    public static boolean getMetadataPartitionsNeedingWriteStatusTracking(HoodieMetadataConfig config, HoodieTableMetaClient metaClient) {
        if (MetadataPartitionType.getMetadataPartitionsNeedingWriteStatusTracking().stream().anyMatch(p -> metaClient.getTableConfig().isMetadataPartitionAvailable((MetadataPartitionType)((Object)p)))) {
            return true;
        }
        Set<String> metadataPartitionsInflight = metaClient.getTableConfig().getMetadataPartitionsInflight();
        if (MetadataPartitionType.getMetadataPartitionsNeedingWriteStatusTracking().stream().anyMatch(p -> metadataPartitionsInflight.contains(p.getPartitionPath()))) {
            return true;
        }
        return config.isRecordIndexEnabled();
    }

    public static HoodieRecordGlobalLocation getLocationFromRecordIndexInfo(HoodieRecordIndexInfo recordIndexInfo) {
        return HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(recordIndexInfo.getPartitionName(), recordIndexInfo.getFileIdEncoding(), recordIndexInfo.getFileIdHighBits(), recordIndexInfo.getFileIdLowBits(), recordIndexInfo.getFileIndex(), recordIndexInfo.getFileId(), recordIndexInfo.getInstantTime());
    }

    public static HoodieRecordGlobalLocation getLocationFromRecordIndexInfo(String partition, int fileIdEncoding, long fileIdHighBits, long fileIdLowBits, int fileIndex, String originalFileId, Long instantTime) {
        String fileId = null;
        if (fileIdEncoding == 0) {
            UUID uuid = new UUID(fileIdHighBits, fileIdLowBits);
            fileId = uuid.toString();
            if (fileIndex != -1) {
                fileId = fileId + "-" + fileIndex;
            }
        } else {
            fileId = originalFileId;
        }
        Date instantDate = new Date(instantTime);
        return new HoodieRecordGlobalLocation(partition, HoodieInstantTimeGenerator.formatDate(instantDate), fileId);
    }

    @Deprecated
    public static HoodieData<HoodieRecord> readRecordKeysFromBaseFiles(HoodieEngineContext engineContext, HoodieConfig config, List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs, boolean forDelete, int recordIndexMaxParallelism, StoragePath basePath, StorageConfiguration<?> configuration, String activeModule) {
        if (partitionBaseFilePairs.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        engineContext.setJobStatus(activeModule, "Record Index: reading record keys from " + partitionBaseFilePairs.size() + " base files");
        int parallelism = Math.min(partitionBaseFilePairs.size(), recordIndexMaxParallelism);
        return engineContext.parallelize(partitionBaseFilePairs, parallelism).flatMap(partitionAndBaseFile -> {
            String partition = (String)partitionAndBaseFile.getKey();
            HoodieBaseFile baseFile = (HoodieBaseFile)partitionAndBaseFile.getValue();
            String filename = baseFile.getFileName();
            StoragePath dataFilePath = HoodieTableMetadataUtil.filePath(basePath, partition, filename);
            String fileId = baseFile.getFileId();
            String instantTime = baseFile.getCommitTime();
            HoodieFileReader reader = HoodieIOFactory.getIOFactory(HoodieStorageUtils.getStorage(basePath, configuration)).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(config, dataFilePath);
            return HoodieTableMetadataUtil.getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete, partition, fileId, instantTime);
        });
    }

    public static HoodieData<HoodieRecord> readRecordKeysFromFileSlices(HoodieEngineContext engineContext, List<Pair<String, FileSlice>> partitionFileSlicePairs, boolean forDelete, int recordIndexMaxParallelism, String activeModule, HoodieTableMetaClient metaClient, EngineType engineType) {
        if (partitionFileSlicePairs.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        engineContext.setJobStatus(activeModule, "Record Index: reading record keys from " + partitionFileSlicePairs.size() + " file slices");
        int parallelism = Math.min(partitionFileSlicePairs.size(), recordIndexMaxParallelism);
        StoragePath basePath = metaClient.getBasePath();
        StorageConfiguration<?> storageConf = metaClient.getStorageConf();
        return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndBaseFile -> {
            String partition = (String)partitionAndBaseFile.getKey();
            FileSlice fileSlice = (FileSlice)partitionAndBaseFile.getValue();
            if (!fileSlice.getBaseFile().isPresent()) {
                List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(l -> l.getPath().toString()).collect(Collectors.toList());
                HoodieMergedLogRecordScanner mergedLogRecordScanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withStorage(metaClient.getStorage()).withBasePath(basePath).withLogFilePaths(logFilePaths)).withReaderSchema(HoodieAvroUtils.getRecordKeySchema()).withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse("")).withReverseReader(false).withMaxMemorySizeInBytes(storageConf.getLong(HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION.key(), 0x40000000L)).withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()).withPartition(fileSlice.getPartitionPath()).withOptimizedLogBlocksScan(storageConf.getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(), false)).withDiskMapType((ExternalSpillableMap.DiskMapType)storageConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), (Enum)HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())).withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())).withRecordMerger(HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(), engineType, Collections.emptyList(), metaClient.getTableConfig().getRecordMergeStrategyId())).withTableMetaClient(metaClient).build();
                ClosableIterator<String> recordKeyIterator = ClosableIterator.wrap(mergedLogRecordScanner.getRecords().keySet().iterator());
                return HoodieTableMetadataUtil.getHoodieRecordIterator(recordKeyIterator, forDelete, partition, fileSlice.getFileId(), fileSlice.getBaseInstantTime());
            }
            HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
            String filename = baseFile.getFileName();
            StoragePath dataFilePath = HoodieTableMetadataUtil.filePath(basePath, partition, filename);
            String fileId = baseFile.getFileId();
            String instantTime = baseFile.getCommitTime();
            HoodieConfig hoodieConfig = ConfigUtils.getReaderConfigs(storageConf);
            HoodieFileReader reader = HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hoodieConfig, dataFilePath);
            return HoodieTableMetadataUtil.getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete, partition, fileId, instantTime);
        });
    }

    public static Schema getProjectedSchemaForExpressionIndex(HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
        Schema tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
        List partitionFields = metaClient.getTableConfig().getPartitionFields().map(Arrays::asList).orElse(Collections.emptyList());
        List<String> sourceFields = indexDefinition.getSourceFields();
        ArrayList<String> mergedFields = new ArrayList<String>(partitionFields.size() + sourceFields.size());
        mergedFields.addAll(partitionFields);
        mergedFields.addAll(sourceFields);
        return HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.getSchemaForFields(tableSchema, mergedFields));
    }

    public static boolean validateDataTypeForSecondaryOrExpressionIndex(List<String> sourceFields, Schema tableSchema) {
        return sourceFields.stream().anyMatch(fieldToIndex -> {
            Schema schema = HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema(tableSchema, fieldToIndex);
            return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
        });
    }

    public static StoragePath filePath(StoragePath basePath, String partition, String filename) {
        if (partition.isEmpty()) {
            return new StoragePath(basePath, filename);
        }
        return new StoragePath(basePath, partition + "/" + filename);
    }

    private static ClosableIterator<HoodieRecord> getHoodieRecordIterator(final ClosableIterator<String> recordKeyIterator, final boolean forDelete, final String partition, final String fileId, final String instantTime) {
        return new ClosableIterator<HoodieRecord>(){

            @Override
            public void close() {
                recordKeyIterator.close();
            }

            @Override
            public boolean hasNext() {
                return recordKeyIterator.hasNext();
            }

            @Override
            public HoodieRecord next() {
                return forDelete ? HoodieMetadataPayload.createRecordIndexDelete((String)recordKeyIterator.next()) : HoodieMetadataPayload.createRecordIndexUpdate((String)recordKeyIterator.next(), partition, fileId, instantTime, 0);
            }
        };
    }

    private static Stream<HoodieRecord> collectAndProcessColumnMetadata(List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata, String partitionPath, boolean isTightBound, Map<String, Schema> colsToIndexSchemaMap) {
        return HoodieTableMetadataUtil.collectAndProcessColumnMetadata(partitionPath, isTightBound, Option.empty(), fileColumnMetadata.stream().flatMap(Collection::stream), colsToIndexSchemaMap);
    }

    private static Stream<HoodieRecord> collectAndProcessColumnMetadata(Iterable<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadataIterable, String partitionPath, boolean isTightBound, Option<String> indexPartitionOpt, Map<String, Schema> colsToIndexSchemaMap) {
        ArrayList fileColumnMetadata = new ArrayList();
        fileColumnMetadataIterable.forEach(fileColumnMetadata::add);
        return HoodieTableMetadataUtil.collectAndProcessColumnMetadata(partitionPath, isTightBound, indexPartitionOpt, fileColumnMetadata.stream(), colsToIndexSchemaMap);
    }

    private static Stream<HoodieRecord> collectAndProcessColumnMetadata(String partitionPath, boolean isTightBound, Option<String> indexPartitionOpt, Stream<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata, Map<String, Schema> colsToIndexSchemaMap) {
        Map columnMetadataMap = fileColumnMetadata.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName, Collectors.toList()));
        Stream<HoodieColumnRangeMetadata> partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream().map(entry -> FileFormatUtils.getColumnRangeInPartition(partitionPath, (List)entry.getValue(), colsToIndexSchemaMap));
        return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, partitionStatsRangeMetadata.collect(Collectors.toList()), false, isTightBound, indexPartitionOpt);
    }

    public static HoodieData<HoodieRecord> collectAndProcessExprIndexPartitionStatRecords(HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata, boolean isTightBound, Option<String> indexPartitionOpt) {
        HoodiePairData<String, Iterable<HoodieColumnRangeMetadata<Comparable>>> columnMetadataMap = fileColumnMetadata.groupByKey();
        return columnMetadataMap.map(entry -> {
            String partitionName = (String)entry.getKey();
            Iterable iterable = (Iterable)entry.getValue();
            HoodieColumnRangeMetadata[] finalMetadata = new HoodieColumnRangeMetadata[]{null};
            iterable.forEach(e -> {
                HoodieColumnRangeMetadata rangeMetadata = HoodieColumnRangeMetadata.create(partitionName, e.getColumnName(), e.getMinValue(), e.getMaxValue(), e.getNullCount(), e.getValueCount(), e.getTotalSize(), e.getTotalUncompressedSize());
                finalMetadata[0] = HoodieColumnRangeMetadata.merge(finalMetadata[0], rangeMetadata);
            });
            return HoodieMetadataPayload.createPartitionStatsRecords(partitionName, Collections.singletonList(finalMetadata[0]), false, isTightBound, indexPartitionOpt).collect(Collectors.toList());
        }).flatMap(List::iterator);
    }

    public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, List<Pair<String, FileSlice>> partitionInfoList, HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataTableMetaClient, Option<Schema> writerSchemaOpt, Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
        if (partitionInfoList.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ? Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient));
        Map<String, Schema> columnsToIndexSchemaMap = HoodieTableMetadataUtil.getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig, lazyWriterSchemaOpt, dataTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().empty(), recordTypeOpt);
        if (columnsToIndexSchemaMap.isEmpty()) {
            LOG.warn("No columns to index for partition stats index");
            return engineContext.emptyHoodieData();
        }
        LOG.debug("Indexing following columns for partition stats index: {}", columnsToIndexSchemaMap);
        List partitionToFileNames = partitionInfoList.stream().collect(Collectors.groupingBy(Pair::getLeft, Collectors.mapping(pair -> HoodieTableMetadataUtil.extractFileNames((FileSlice)pair.getRight()), Collectors.toList()))).entrySet().stream().map(entry -> Pair.of(entry.getKey(), ((List)entry.getValue()).stream().flatMap(Collection::stream).collect(Collectors.toSet()))).collect(Collectors.toList());
        int parallelism = Math.max(Math.min(partitionToFileNames.size(), metadataConfig.getPartitionStatsIndexParallelism()), 1);
        return engineContext.parallelize(partitionToFileNames, parallelism).flatMap(partitionInfo -> {
            String partitionPath = (String)partitionInfo.getKey();
            List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = ((Set)partitionInfo.getValue()).stream().map(fileName -> HoodieTableMetadataUtil.getFileStatsRangeMetadata(partitionPath, fileName, dataTableMetaClient, new ArrayList<String>(columnsToIndexSchemaMap.keySet()), false, metadataConfig.getMaxReaderBufferSize())).collect(Collectors.toList());
            return HoodieTableMetadataUtil.collectAndProcessColumnMetadata(fileColumnMetadata, partitionPath, true, columnsToIndexSchemaMap).iterator();
        });
    }

    private static Set<String> extractFileNames(FileSlice fileSlice) {
        HashSet<String> fileNames = new HashSet<String>();
        Option<HoodieBaseFile> baseFile = fileSlice.getBaseFile();
        baseFile.ifPresent(hoodieBaseFile -> fileNames.add(hoodieBaseFile.getFileName()));
        fileSlice.getLogFiles().forEach(hoodieLogFile -> fileNames.add(hoodieLogFile.getFileName()));
        return fileNames;
    }

    private static List<HoodieColumnRangeMetadata<Comparable>> getFileStatsRangeMetadata(String partitionPath, String fileName, HoodieTableMetaClient datasetMetaClient, List<String> columnsToIndex, boolean isDeleted, int maxBufferSize) {
        if (isDeleted) {
            return columnsToIndex.stream().map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry)).collect(Collectors.toList());
        }
        return HoodieTableMetadataUtil.readColumnRangeMetadataFrom(partitionPath, fileName, datasetMetaClient, columnsToIndex, maxBufferSize);
    }

    private static HoodieData<HoodieRecord> convertMetadataToPartitionStatsRecords(HoodiePairData<String, List<HoodieColumnRangeMetadata<Comparable>>> columnRangeMetadataPartitionPair, HoodieTableMetaClient dataMetaClient, Map<String, Schema> colsToIndexSchemaMap) {
        try {
            return columnRangeMetadataPartitionPair.flatMapValues(List::iterator).groupByKey().map(pair -> {
                String partitionName = (String)pair.getLeft();
                return HoodieTableMetadataUtil.collectAndProcessColumnMetadata((Iterable)pair.getRight(), partitionName, HoodieTableMetadataUtil.isShouldScanColStatsForTightBound(dataMetaClient), Option.empty(), colsToIndexSchemaMap);
            }).flatMap(recordStream -> recordStream.iterator());
        }
        catch (Exception e) {
            throw new HoodieException("Failed to generate column stats records for metadata table", e);
        }
    }

    public static HoodieData<HoodieRecord> convertMetadataToPartitionStatRecords(HoodieCommitMetadata commitMetadata, HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient, HoodieTableMetadata tableMetadata, HoodieMetadataConfig metadataConfig, Option<HoodieRecord.HoodieRecordType> recordTypeOpt, boolean isDeletePartition) {
        try {
            Option<Schema> writerSchema = Option.ofNullable(commitMetadata.getMetadata("schema")).flatMap(writerSchemaStr -> StringUtils.isNullOrEmpty(writerSchemaStr) ? Option.empty() : Option.of(new Schema.Parser().parse(writerSchemaStr)));
            HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
            Option<Schema> tableSchema = writerSchema.map(schema -> tableConfig.populateMetaFields() ? HoodieAvroUtils.addMetadataFields(schema) : schema);
            if (tableSchema.isEmpty()) {
                return engineContext.emptyHoodieData();
            }
            Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
            Map<String, Schema> columnsToIndexSchemaMap = HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig, writerSchemaOpt, false, recordTypeOpt);
            if (columnsToIndexSchemaMap.isEmpty()) {
                return engineContext.emptyHoodieData();
            }
            if (isDeletePartition) {
                HoodieReplaceCommitMetadata replaceCommitMetadata = (HoodieReplaceCommitMetadata)commitMetadata;
                Map<String, List<String>> partitionToReplaceFileIds = replaceCommitMetadata.getPartitionToReplaceFileIds();
                ArrayList<String> partitionsToDelete = new ArrayList<String>(partitionToReplaceFileIds.keySet());
                if (partitionToReplaceFileIds.isEmpty()) {
                    return engineContext.emptyHoodieData();
                }
                return engineContext.parallelize(partitionsToDelete, partitionsToDelete.size()).flatMap(partition -> {
                    Stream columnRangeMetadata = columnsToIndexSchemaMap.keySet().stream().flatMap(column -> HoodieMetadataPayload.createPartitionStatsRecords(partition, Collections.singletonList(HoodieColumnRangeMetadata.stub("", column)), true, true, Option.empty()));
                    return columnRangeMetadata.iterator();
                });
            }
            List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
            if (allWriteStats.isEmpty()) {
                return engineContext.emptyHoodieData();
            }
            ArrayList<String> colsToIndex = new ArrayList<String>(columnsToIndexSchemaMap.keySet());
            LOG.debug("Indexing following columns for partition stats index: {}", columnsToIndexSchemaMap.keySet());
            ArrayList<List<HoodieWriteStat>> partitionedWriteStats = new ArrayList<List<HoodieWriteStat>>(allWriteStats.stream().collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath)).values());
            int parallelism = Math.max(Math.min(partitionedWriteStats.size(), metadataConfig.getPartitionStatsIndexParallelism()), 1);
            boolean shouldScanColStatsForTightBound = HoodieTableMetadataUtil.isShouldScanColStatsForTightBound(dataMetaClient);
            HoodiePairData<String, List<HoodieColumnRangeMetadata<Comparable>>> columnRangeMetadata = engineContext.parallelize(partitionedWriteStats, parallelism).mapToPair(partitionedWriteStat -> {
                String partitionName = ((HoodieWriteStat)partitionedWriteStat.get(0)).getPartitionPath();
                List fileColumnMetadata = partitionedWriteStat.stream().flatMap(writeStat -> HoodieTableMetadataUtil.translateWriteStatToFileStats(writeStat, dataMetaClient, colsToIndex, tableSchema).stream()).collect(Collectors.toList());
                if (shouldScanColStatsForTightBound) {
                    ValidationUtils.checkState(tableMetadata != null, "tableMetadata should not be null when scanning metadata table");
                    Set fileNames = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(), partitionName).stream().flatMap(fileSlice -> Stream.concat(Stream.of(fileSlice.getBaseFile().map(BaseFile::getFileName).orElse(null)), fileSlice.getLogFiles().map(HoodieLogFile::getFileName))).filter(Objects::nonNull).collect(Collectors.toSet());
                    List<HoodieColumnRangeMetadata> partitionColumnMetadata = tableMetadata.getRecordsByKeyPrefixes(HoodieTableMetadataUtil.generateKeyPrefixes(colsToIndex, partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false).map(record -> ((HoodieMetadataPayload)record.getData()).getColumnStatMetadata()).filter(Option::isPresent).map(colStatsOpt -> (HoodieMetadataColumnStats)((Object)((Object)((Object)colStatsOpt.get())))).filter(stats -> fileNames.contains(stats.getFileName())).map(HoodieColumnRangeMetadata::fromColumnStats).collectAsList();
                    if (!partitionColumnMetadata.isEmpty()) {
                        fileColumnMetadata.addAll(partitionColumnMetadata);
                    }
                }
                return Pair.of(partitionName, fileColumnMetadata);
            });
            return HoodieTableMetadataUtil.convertMetadataToPartitionStatsRecords(columnRangeMetadata, dataMetaClient, columnsToIndexSchemaMap);
        }
        catch (Exception e) {
            throw new HoodieException("Failed to generate column stats records for metadata table", e);
        }
    }

    public static boolean isShouldScanColStatsForTightBound(HoodieTableMetaClient dataMetaClient) {
        return MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient);
    }

    public static HoodieIndexDefinition getHoodieIndexDefinition(String indexName, HoodieTableMetaClient metaClient) {
        Option<HoodieIndexMetadata> expressionIndexMetadata = metaClient.getIndexMetadata();
        if (expressionIndexMetadata.isPresent()) {
            return expressionIndexMetadata.get().getIndexDefinitions().get(indexName);
        }
        throw new HoodieIndexException("Expression Index definition is not present");
    }

    public static List<String> generateKeyPrefixes(List<String> columnsToIndex, String partitionName) {
        ArrayList<String> keyPrefixes = new ArrayList<String>();
        PartitionIndexID partitionIndexId = new PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionName));
        for (String columnName : columnsToIndex) {
            ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
            String keyPrefix = columnIndexID.asBase64EncodedString().concat(partitionIndexId.asBase64EncodedString());
            keyPrefixes.add(keyPrefix);
        }
        return keyPrefixes;
    }

    private static List<HoodieColumnRangeMetadata<Comparable>> translateWriteStatToFileStats(HoodieWriteStat writeStat, HoodieTableMetaClient datasetMetaClient, List<String> columnsToIndex, Option<Schema> writerSchemaOpt) {
        if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat)writeStat).getColumnStats().isPresent()) {
            Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = ((HoodieDeltaWriteStat)writeStat).getColumnStats().get();
            return columnRangeMap.values().stream().collect(Collectors.toList());
        }
        String filePath = writeStat.getPath();
        return HoodieTableMetadataUtil.getFileStatsRangeMetadata(writeStat.getPartitionPath(), FSUtils.getFileNameFromPath(filePath), datasetMetaClient, columnsToIndex, false, -1);
    }

    public static String getPartitionStatsIndexKey(String partitionPath, String columnName) {
        PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionPath));
        ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
        return columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString());
    }

    public static String getPartitionStatsIndexKey(String partitionPathPrefix, String partitionPath, String columnName) {
        PartitionIndexID partitionPrefixID = new PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionPathPrefix));
        PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionPath));
        ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
        String partitionID = partitionPrefixID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString());
        return columnIndexID.asBase64EncodedString().concat(partitionID);
    }

    public static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats, HoodieMetadataColumnStats newColumnStats) {
        ValidationUtils.checkArgument(Objects.equals(prevColumnStats.getColumnName(), newColumnStats.getColumnName()));
        if (newColumnStats.getIsDeleted() || prevColumnStats.getIsDeleted()) {
            return newColumnStats;
        }
        if (newColumnStats.getIsTightBound()) {
            return newColumnStats;
        }
        Comparable minValue = Stream.of(HoodieAvroUtils.unwrapAvroValueWrapper(prevColumnStats.getMinValue()), HoodieAvroUtils.unwrapAvroValueWrapper(newColumnStats.getMinValue())).filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null);
        Comparable maxValue = Stream.of(HoodieAvroUtils.unwrapAvroValueWrapper(prevColumnStats.getMaxValue()), HoodieAvroUtils.unwrapAvroValueWrapper(newColumnStats.getMaxValue())).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null);
        HoodieMetadataColumnStats.Builder columnStatsBuilder = HoodieMetadataColumnStats.newBuilder(HoodieMetadataPayload.METADATA_COLUMN_STATS_BUILDER_STUB.get()).setFileName(newColumnStats.getFileName()).setColumnName(newColumnStats.getColumnName()).setMinValue(HoodieAvroUtils.wrapValueIntoAvro(minValue)).setMaxValue(HoodieAvroUtils.wrapValueIntoAvro(maxValue)).setValueCount(prevColumnStats.getValueCount() + newColumnStats.getValueCount()).setNullCount(prevColumnStats.getNullCount() + newColumnStats.getNullCount()).setTotalSize(prevColumnStats.getTotalSize() + newColumnStats.getTotalSize()).setTotalUncompressedSize(prevColumnStats.getTotalUncompressedSize() + newColumnStats.getTotalUncompressedSize()).setIsDeleted(newColumnStats.getIsDeleted());
        if (newColumnStats.hasField("isTightBound")) {
            columnStatsBuilder.setIsTightBound(newColumnStats.getIsTightBound());
        }
        return columnStatsBuilder.build();
    }

    public static Map<String, HoodieMetadataFileInfo> combineFileSystemMetadata(HoodieMetadataPayload older, HoodieMetadataPayload newer) {
        HashMap<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<String, HoodieMetadataFileInfo>();
        if (older.filesystemMetadata != null) {
            combinedFileInfo.putAll(older.filesystemMetadata);
        }
        if (newer.filesystemMetadata != null) {
            HoodieTableMetadataUtil.validatePayload(newer.type, newer.filesystemMetadata);
            newer.filesystemMetadata.forEach((key, fileInfo) -> combinedFileInfo.merge((String)key, (HoodieMetadataFileInfo)((Object)fileInfo), (oldFileInfo, newFileInfo) -> {
                if (newFileInfo.getIsDeleted()) {
                    if (oldFileInfo.getIsDeleted()) {
                        LOG.warn("A file is repeatedly deleted in the files partition of the metadata table: {}", key);
                        return newFileInfo;
                    }
                    return null;
                }
                return new HoodieMetadataFileInfo(Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), false);
            }));
        }
        return combinedFileInfo;
    }

    private static void validatePayload(int type2, Map<String, HoodieMetadataFileInfo> filesystemMetadata) {
        if (type2 == MetadataPartitionType.FILES.getRecordType()) {
            filesystemMetadata.forEach((fileName, fileInfo) -> ValidationUtils.checkState(fileInfo.getIsDeleted() || fileInfo.getSize() > 0L, "Existing files should have size > 0"));
        }
    }

    public static Set<String> getExpressionIndexPartitionsToInit(MetadataPartitionType partitionType, HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
        return HoodieTableMetadataUtil.getIndexPartitionsToInit(partitionType, metadataConfig, dataMetaClient, () -> MetadataPartitionType.isNewExpressionIndexDefinitionRequired(metadataConfig, dataMetaClient), metadataConfig::getExpressionIndexColumn, metadataConfig::getExpressionIndexName, PARTITION_NAME_EXPRESSION_INDEX_PREFIX, metadataConfig.getExpressionIndexType());
    }

    public static Set<String> getSecondaryIndexPartitionsToInit(MetadataPartitionType partitionType, HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
        return HoodieTableMetadataUtil.getIndexPartitionsToInit(partitionType, metadataConfig, dataMetaClient, () -> MetadataPartitionType.isNewSecondaryIndexDefinitionRequired(metadataConfig, dataMetaClient), metadataConfig::getSecondaryIndexColumn, metadataConfig::getSecondaryIndexName, PARTITION_NAME_SECONDARY_INDEX_PREFIX, PARTITION_NAME_SECONDARY_INDEX);
    }

    private static Set<String> getIndexPartitionsToInit(MetadataPartitionType partitionType, HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient, Supplier<Boolean> isNewIndexRequired, Supplier<String> getIndexedColumn, Supplier<String> getIndexName, String partitionNamePrefix, String indexType) {
        Set<String> indexPartitionsToInit = HoodieTableMetadataUtil.getIndexPartitionsToInitBasedOnIndexDefinition(partitionType, dataMetaClient);
        if (indexPartitionsToInit.isEmpty() && isNewIndexRequired.get().booleanValue()) {
            String indexedColumn = getIndexedColumn.get();
            String indexName = HoodieTableMetadataUtil.getSecondaryOrExpressionIndexName(getIndexName, partitionNamePrefix, indexedColumn);
            HoodieIndexDefinition.Builder indexDefinitionBuilder = HoodieIndexDefinition.newBuilder().withIndexName(indexName).withIndexType(indexType).withSourceFields(Collections.singletonList(indexedColumn));
            if (partitionNamePrefix.equals(PARTITION_NAME_EXPRESSION_INDEX_PREFIX)) {
                indexDefinitionBuilder.withIndexOptions(metadataConfig.getExpressionIndexOptions());
                indexDefinitionBuilder.withIndexFunction(metadataConfig.getExpressionIndexOptions().getOrDefault("expr", "identity"));
            }
            dataMetaClient.buildIndexDefinition(indexDefinitionBuilder.build());
            indexPartitionsToInit = HoodieTableMetadataUtil.getIndexPartitionsToInitBasedOnIndexDefinition(partitionType, dataMetaClient);
        }
        return indexPartitionsToInit;
    }

    public static String getSecondaryOrExpressionIndexName(Supplier<String> getConfiguredIndexName, String partitionNamePrefix, String indexedColumn) {
        String indexName = getConfiguredIndexName.get();
        if (StringUtils.isNullOrEmpty(indexName) && StringUtils.nonEmpty(indexedColumn)) {
            indexName = partitionNamePrefix + indexedColumn;
        }
        if (StringUtils.nonEmpty(indexName) && !indexName.startsWith(partitionNamePrefix)) {
            indexName = partitionNamePrefix + indexName;
        }
        return indexName;
    }

    private static Set<String> getIndexPartitionsToInitBasedOnIndexDefinition(MetadataPartitionType partitionType, HoodieTableMetaClient dataMetaClient) {
        if (dataMetaClient.getIndexMetadata().isEmpty()) {
            return Collections.emptySet();
        }
        Set<String> indexPartitions = dataMetaClient.getIndexMetadata().get().getIndexDefinitions().values().stream().map(HoodieIndexDefinition::getIndexName).filter(indexName -> indexName.startsWith(partitionType.getPartitionPath())).collect(Collectors.toSet());
        Set<String> completedMetadataPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
        indexPartitions.removeAll(completedMetadataPartitions);
        return indexPartitions;
    }

    public static class DirectoryInfo
    implements Serializable {
        private final String relativePath;
        private final HashMap<String, Long> filenameToSizeMap;
        private final List<StoragePath> subDirectories = new ArrayList<StoragePath>();
        private boolean isHoodiePartition = false;

        public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, String maxInstantTime, Set<String> pendingDataInstants) {
            this(relativePath, pathInfos, maxInstantTime, pendingDataInstants, true);
        }

        public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, String maxInstantTime, Set<String> pendingDataInstants, boolean validateHoodiePartitions) {
            this.relativePath = relativePath;
            this.filenameToSizeMap = new HashMap(pathInfos.size());
            this.isHoodiePartition = !validateHoodiePartitions || pathInfos.stream().anyMatch(status -> status.getPath().getName().startsWith(".hoodie_partition_metadata"));
            for (StoragePathInfo pathInfo : pathInfos) {
                String dataFileCommitTime;
                if (!this.isHoodiePartition && pathInfo.isDirectory()) {
                    if (pathInfo.getPath().getName().equals(".hoodie")) continue;
                    this.subDirectories.add(pathInfo.getPath());
                    continue;
                }
                if (!this.isHoodiePartition || !FSUtils.isDataFile(pathInfo.getPath()) || pendingDataInstants.contains(dataFileCommitTime = FSUtils.getCommitTime(pathInfo.getPath().getName())) || !InstantComparison.compareTimestamps(dataFileCommitTime, InstantComparison.LESSER_THAN_OR_EQUALS, maxInstantTime)) continue;
                this.filenameToSizeMap.put(pathInfo.getPath().getName(), pathInfo.getLength());
            }
        }

        String getRelativePath() {
            return this.relativePath;
        }

        int getTotalFiles() {
            return this.filenameToSizeMap.size();
        }

        boolean isHoodiePartition() {
            return this.isHoodiePartition;
        }

        List<StoragePath> getSubDirectories() {
            return this.subDirectories;
        }

        Map<String, Long> getFileNameToSizeMap() {
            return this.filenameToSizeMap;
        }
    }
}

