/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import io.hops.hudi.org.apache.avro.Schema;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.HoodieMergedReadHandle;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.Lazy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableMetadataWriter {
    static final Logger LOG = LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class);
    private static final String RECORD_KEY_FIELD_NAME = "key";
    private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
    protected transient BaseHoodieWriteClient<?, I, ?, ?> writeClient;
    protected HoodieWriteConfig metadataWriteConfig;
    protected HoodieWriteConfig dataWriteConfig;
    protected HoodieBackedTableMetadata metadata;
    protected HoodieTableMetaClient metadataMetaClient;
    protected HoodieTableMetaClient dataMetaClient;
    protected Option<HoodieMetadataMetrics> metrics;
    protected StorageConfiguration<?> storageConf;
    protected final transient HoodieEngineContext engineContext;
    protected final List<MetadataPartitionType> enabledPartitionTypes;
    boolean initialized = false;
    private HoodieTableFileSystemView metadataView;

    protected HoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<String> inflightInstantTimestamp) {
        this.dataWriteConfig = writeConfig;
        this.engineContext = engineContext;
        this.storageConf = storageConf;
        this.metrics = Option.empty();
        this.dataMetaClient = HoodieTableMetaClient.builder().setConf(storageConf.newInstance()).setBasePath(this.dataWriteConfig.getBasePath()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
        this.enabledPartitionTypes = this.getEnabledPartitions(this.dataWriteConfig.getMetadataConfig(), this.dataMetaClient);
        if (writeConfig.isMetadataTableEnabled()) {
            this.metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy);
            try {
                this.initRegistry();
                this.initialized = this.initializeIfNeeded(this.dataMetaClient, inflightInstantTimestamp);
            }
            catch (IOException e) {
                LOG.error("Failed to initialize metadata table", (Throwable)e);
            }
        }
        ValidationUtils.checkArgument(!this.initialized || this.metadata != null, "MDT Reader should have been opened post initialization");
    }

    List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig metadataConfig, HoodieTableMetaClient metaClient) {
        return MetadataPartitionType.getEnabledPartitions(metadataConfig, metaClient);
    }

    abstract HoodieTable getTable(HoodieWriteConfig var1, HoodieTableMetaClient var2);

    private void mayBeReinitMetadataReader() {
        if (this.metadata == null || this.metadataMetaClient == null || this.metadata.getMetadataFileSystemView() == null) {
            this.initMetadataReader();
        }
    }

    private void initMetadataReader() {
        if (this.metadata != null) {
            this.metadata.close();
        }
        try {
            this.metadata = new HoodieBackedTableMetadata(this.engineContext, this.dataMetaClient.getStorage(), this.dataWriteConfig.getMetadataConfig(), this.dataWriteConfig.getBasePath(), true);
            this.metadataMetaClient = this.metadata.getMetadataMetaClient();
        }
        catch (Exception e) {
            throw new HoodieException("Could not open MDT for reads", e);
        }
    }

    private HoodieTableFileSystemView getMetadataView() {
        if (this.metadataView == null || !this.metadataView.equals(this.metadata.getMetadataFileSystemView())) {
            ValidationUtils.checkState(this.metadata != null, "Metadata table not initialized");
            ValidationUtils.checkState(this.dataMetaClient != null, "Data table meta client not initialized");
            this.metadataView = new HoodieTableFileSystemView(this.metadata, this.dataMetaClient, this.dataMetaClient.getActiveTimeline());
        }
        return this.metadataView;
    }

    protected abstract void initRegistry();

    public HoodieWriteConfig getWriteConfig() {
        return this.metadataWriteConfig;
    }

    public HoodieBackedTableMetadata getTableMetadata() {
        return this.metadata;
    }

    public List<MetadataPartitionType> getEnabledPartitionTypes() {
        return this.enabledPartitionTypes;
    }

    protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) throws IOException {
        HoodieTimer timer = HoodieTimer.start();
        ArrayList<MetadataPartitionType> metadataPartitionsToInit = new ArrayList<MetadataPartitionType>(MetadataPartitionType.getValidValues().length);
        try {
            boolean exists = this.metadataTableExists(dataMetaClient);
            if (!exists) {
                metadataPartitionsToInit.add(MetadataPartitionType.FILES);
            }
            if (!this.dataWriteConfig.isMetadataAsyncIndex()) {
                Set<String> completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
                LOG.info("Async metadata indexing disabled and following partitions already initialized: {}", completedPartitions);
                this.enabledPartitionTypes.stream().filter(p -> !completedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)).forEach(metadataPartitionsToInit::add);
            }
            if (metadataPartitionsToInit.isEmpty()) {
                this.initMetadataReader();
                return true;
            }
            String initializationTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse("00000000000000");
            if (!this.initializeFromFilesystem(initializationTime, metadataPartitionsToInit, inflightInstantTimestamp)) {
                LOG.error("Failed to initialize MDT from filesystem");
                return false;
            }
            this.metrics.ifPresent(m -> m.updateMetrics("initialize", timer.endTimer()));
            return true;
        }
        catch (IOException e) {
            LOG.error("Failed to initialize metadata table. Disabling the writer.", (Throwable)e);
            return false;
        }
    }

    private boolean metadataTableExists(HoodieTableMetaClient dataMetaClient) throws IOException {
        boolean exists = dataMetaClient.getTableConfig().isMetadataTableAvailable();
        boolean reInitialize = false;
        if (exists) {
            try {
                this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.storageConf.newInstance()).setBasePath(this.metadataWriteConfig.getBasePath()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
                if (this.metadataMetaClient.getTableConfig().populateMetaFields()) {
                    LOG.info("Re-initiating metadata table properties since populate meta fields have changed");
                    this.metadataMetaClient = this.initializeMetaClient();
                }
            }
            catch (TableNotFoundException e) {
                return false;
            }
            Option<HoodieInstant> latestMetadataInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
            reInitialize = this.isBootstrapNeeded(latestMetadataInstant);
        }
        if (reInitialize) {
            this.metrics.ifPresent(m -> m.incrementMetric("rebootstrap_count", 1L));
            LOG.info("Deleting Metadata Table directory so that it can be re-initialized");
            HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, this.engineContext, false);
            exists = false;
        }
        return exists;
    }

    private boolean isBootstrapNeeded(Option<HoodieInstant> latestMetadataInstant) {
        if (!latestMetadataInstant.isPresent()) {
            LOG.warn("Metadata Table will need to be re-initialized as no instants were found");
            return true;
        }
        return false;
    }

    boolean shouldInitializeFromFilesystem(Set<String> pendingDataInstants, Option<String> inflightInstantTimestamp) {
        return true;
    }

    private boolean initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException {
        Set<String> pendingDataInstants = this.getPendingDataInstants(this.dataMetaClient);
        if (!this.shouldInitializeFromFilesystem(pendingDataInstants, inflightInstantTimestamp)) {
            return false;
        }
        boolean filesPartitionAvailable = this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
        if (!filesPartitionAvailable) {
            partitionsToInit.remove((Object)MetadataPartitionType.FILES);
            partitionsToInit.add(0, MetadataPartitionType.FILES);
            this.metadataMetaClient = this.initializeMetaClient();
        } else {
            this.initMetadataReader();
            if (this.metadataMetaClient == null) {
                this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.storageConf.newInstance()).setBasePath(this.metadataWriteConfig.getBasePath()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
            }
        }
        partitionsToInit.removeIf(metadataPartition -> this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable((MetadataPartitionType)((Object)metadataPartition)));
        List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList = filesPartitionAvailable ? this.listAllPartitionsFromMDT(initializationTime, pendingDataInstants) : (this.dataWriteConfig.getMetadataConfig().shouldAutoInitialize() ? this.listAllPartitionsFromFilesystem(initializationTime, pendingDataInstants) : Collections.emptyList());
        Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream().map(p -> {
            String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());
            return Pair.of(partitionName, p.getFileNameToSizeMap());
        }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        Iterator<MetadataPartitionType> iterator2 = partitionsToInit.iterator();
        while (iterator2.hasNext()) {
            MetadataPartitionType partitionType = iterator2.next();
            if (partitionType != MetadataPartitionType.PARTITION_STATS || this.dataMetaClient.getTableConfig().isTablePartitioned()) continue;
            LOG.warn("Partition stats index cannot be enabled for a non-partitioned table. Removing from initialization list. Please disable {}", (Object)HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key());
            iterator2.remove();
            this.enabledPartitionTypes.remove((Object)partitionType);
        }
        block12: for (MetadataPartitionType partitionType : partitionsToInit) {
            String partitionName;
            Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
            String instantTimeForPartition = this.generateUniqueInstantTime(initializationTime);
            String partitionTypeName = partitionType.name();
            LOG.info("Initializing MDT partition {} at instant {}", (Object)partitionTypeName, (Object)instantTimeForPartition);
            ArrayList<String> columnsToIndex = new ArrayList();
            try {
                switch (partitionType) {
                    case FILES: {
                        fileGroupCountAndRecordsPair = this.initializeFilesPartition(partitionInfoList);
                        partitionName = MetadataPartitionType.FILES.getPartitionPath();
                        break;
                    }
                    case BLOOM_FILTERS: {
                        fileGroupCountAndRecordsPair = this.initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
                        partitionName = MetadataPartitionType.BLOOM_FILTERS.getPartitionPath();
                        break;
                    }
                    case COLUMN_STATS: {
                        Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> colStatsColumnsAndRecord = this.initializeColumnStatsPartition(partitionToFilesMap);
                        columnsToIndex = colStatsColumnsAndRecord.getKey();
                        fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
                        partitionName = MetadataPartitionType.COLUMN_STATS.getPartitionPath();
                        break;
                    }
                    case RECORD_INDEX: {
                        fileGroupCountAndRecordsPair = this.initializeRecordIndexPartition();
                        partitionName = MetadataPartitionType.RECORD_INDEX.getPartitionPath();
                        break;
                    }
                    case EXPRESSION_INDEX: {
                        Set<String> expressionIndexPartitionsToInit = HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit(partitionType, this.dataWriteConfig.getMetadataConfig(), this.dataMetaClient);
                        if (expressionIndexPartitionsToInit.size() != 1) {
                            if (expressionIndexPartitionsToInit.size() <= 1) continue block12;
                            LOG.warn("Skipping expression index initialization as only one expression index bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit);
                            continue block12;
                        }
                        partitionName = expressionIndexPartitionsToInit.iterator().next();
                        fileGroupCountAndRecordsPair = this.initializeExpressionIndexPartition(partitionName, instantTimeForPartition);
                        break;
                    }
                    case PARTITION_STATS: {
                        if (!this.dataWriteConfig.isMetadataColumnStatsIndexEnabled()) {
                            LOG.warn("Skipping partition stats initialization as column stats index is not enabled. Please enable {}", (Object)HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
                            continue block12;
                        }
                        fileGroupCountAndRecordsPair = this.initializePartitionStatsIndex();
                        partitionName = MetadataPartitionType.PARTITION_STATS.getPartitionPath();
                        break;
                    }
                    case SECONDARY_INDEX: {
                        Set<String> secondaryIndexPartitionsToInit = HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit(partitionType, this.dataWriteConfig.getMetadataConfig(), this.dataMetaClient);
                        if (secondaryIndexPartitionsToInit.size() != 1) {
                            if (secondaryIndexPartitionsToInit.size() <= 1) continue block12;
                            LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit);
                            continue block12;
                        }
                        partitionName = secondaryIndexPartitionsToInit.iterator().next();
                        fileGroupCountAndRecordsPair = this.initializeSecondaryIndexPartition(partitionName);
                        break;
                    }
                    default: {
                        throw new HoodieMetadataException(String.format("Unsupported MDT partition type: %s", new Object[]{partitionType}));
                    }
                }
            }
            catch (Exception e) {
                String metricKey = partitionType.getPartitionPath() + "_" + "bootstrap_error";
                this.metrics.ifPresent(m -> m.setMetric(metricKey, 1L));
                String errMsg = String.format("Bootstrap on %s partition failed for %s", partitionType.getPartitionPath(), this.metadataMetaClient.getBasePath());
                LOG.error(errMsg, (Throwable)e);
                throw new HoodieMetadataException(errMsg, e);
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("Initializing {} index with {} mappings", (Object)partitionTypeName, (Object)fileGroupCountAndRecordsPair.getKey());
            }
            HoodieTimer partitionInitTimer = HoodieTimer.start();
            int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
            ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionTypeName + " should be > 0");
            this.initializeFileGroups(this.dataMetaClient, partitionType, instantTimeForPartition, fileGroupCount, partitionName);
            HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
            this.bulkCommit(instantTimeForPartition, partitionName, records, fileGroupCount);
            if (partitionType == MetadataPartitionType.COLUMN_STATS) {
                this.updateColumnsToIndexWithColStats(columnsToIndex);
            }
            this.dataMetaClient.getTableConfig().setMetadataPartitionState(this.dataMetaClient, partitionName, true);
            this.initMetadataReader();
            long totalInitTime = partitionInitTimer.endTimer();
            LOG.info("Initializing {} index in metadata table took {} in ms", (Object)partitionTypeName, (Object)totalInitTime);
        }
        return true;
    }

    protected abstract void updateColumnsToIndexWithColStats(List<String> var1);

    String generateUniqueInstantTime(String initializationTime) {
        HoodieTimeline dataIndexTimeline = this.dataMetaClient.getActiveTimeline().filter(instant -> instant.getAction().equals("indexing"));
        if (HoodieTableMetadataUtil.isIndexingCommit(dataIndexTimeline, initializationTime)) {
            return initializationTime;
        }
        int offset = 0;
        while (true) {
            String commitInstantTime = HoodieInstantTimeGenerator.instantTimePlusMillis("00000000000000", offset);
            if (!this.metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
                return commitInstantTime;
            }
            ++offset;
        }
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex() throws IOException {
        HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(this.engineContext, this.getPartitionFileSlicePairs(), this.dataWriteConfig.getMetadataConfig(), this.dataMetaClient, Option.empty(), Option.of(this.dataWriteConfig.getRecordMerger().getRecordType()));
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
        return Pair.of(fileGroupCount, records);
    }

    private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
        if (partitionToFilesMap.isEmpty()) {
            return Pair.of(Collections.emptyList(), Pair.of(fileGroupCount, this.engineContext.emptyHoodieData()));
        }
        Lazy<Option<Schema>> tableSchema = Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(this.dataMetaClient));
        ArrayList<String> columnsToIndex = new ArrayList<String>(HoodieTableMetadataUtil.getColumnsToIndex(this.dataMetaClient.getTableConfig(), this.dataWriteConfig.getMetadataConfig(), tableSchema, true, Option.of(this.dataWriteConfig.getRecordMerger().getRecordType())).keySet());
        if (columnsToIndex.isEmpty()) {
            return Pair.of(columnsToIndex, Pair.of(fileGroupCount, this.engineContext.emptyHoodieData()));
        }
        LOG.info("Indexing {} columns for column stats index", (Object)columnsToIndex.size());
        HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(this.engineContext, Collections.emptyMap(), partitionToFilesMap, this.dataMetaClient, this.dataWriteConfig.getMetadataConfig(), this.dataWriteConfig.getColumnStatsIndexParallelism(), this.dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize(), columnsToIndex);
        return Pair.of(columnsToIndex, Pair.of(fileGroupCount, records));
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition(String createInstantTime, Map<String, Map<String, Long>> partitionToFilesMap) {
        HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(this.engineContext, Collections.emptyMap(), partitionToFilesMap, createInstantTime, this.dataMetaClient, this.dataWriteConfig.getBloomIndexParallelism(), this.dataWriteConfig.getBloomFilterType());
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
        return Pair.of(fileGroupCount, records);
    }

    protected abstract HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> var1, HoodieIndexDefinition var2, HoodieTableMetaClient var3, int var4, Schema var5, StorageConfiguration<?> var6, String var7);

    protected abstract EngineType getEngineType();

    private Pair<Integer, HoodieData<HoodieRecord>> initializeExpressionIndexPartition(String indexName, String instantTime) throws Exception {
        HoodieIndexDefinition indexDefinition = this.getIndexDefinition(indexName);
        ValidationUtils.checkState(indexDefinition != null, "Expression Index definition is not present for index " + indexName);
        List<Pair<String, FileSlice>> partitionFileSlicePairs = this.getPartitionFileSlicePairs();
        ArrayList<Pair<String, Pair<String, Long>>> partitionFilePathSizeTriplet = new ArrayList<Pair<String, Pair<String, Long>>>();
        partitionFileSlicePairs.forEach(entry -> {
            if (((FileSlice)entry.getValue()).getBaseFile().isPresent()) {
                partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(((FileSlice)entry.getValue()).getBaseFile().get().getPath(), ((FileSlice)entry.getValue()).getBaseFile().get().getFileLen())));
            }
            ((FileSlice)entry.getValue()).getLogFiles().forEach(hoodieLogFile -> {
                if (((FileSlice)entry.getValue()).getLogFiles().count() > 0L) {
                    ((FileSlice)entry.getValue()).getLogFiles().forEach(logfile -> partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(logfile.getPath().toString(), logfile.getFileSize()))));
                }
            });
        });
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
        int parallelism = Math.min(partitionFilePathSizeTriplet.size(), this.dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
        Schema readerSchema = HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex(indexDefinition, this.dataMetaClient);
        return Pair.of(fileGroupCount, this.getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition, this.dataMetaClient, parallelism, readerSchema, this.storageConf, instantTime));
    }

    HoodieIndexDefinition getIndexDefinition(String indexName) {
        return HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, this.dataMetaClient);
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeSecondaryIndexPartition(String indexName) throws IOException {
        HoodieIndexDefinition indexDefinition = this.getIndexDefinition(indexName);
        ValidationUtils.checkState(indexDefinition != null, "Secondary Index definition is not present for index " + indexName);
        List<Pair<String, FileSlice>> partitionFileSlicePairs = this.getPartitionFileSlicePairs();
        int parallelism = Math.min(partitionFileSlicePairs.size(), this.dataWriteConfig.getMetadataConfig().getSecondaryIndexParallelism());
        HoodieData<HoodieRecord> records = SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices(this.engineContext, partitionFileSlicePairs, parallelism, this.getClass().getSimpleName(), this.dataMetaClient, this.getEngineType(), indexDefinition);
        int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX, records.count(), 48, this.dataWriteConfig.getRecordIndexMinFileGroupCount(), this.dataWriteConfig.getRecordIndexMaxFileGroupCount(), this.dataWriteConfig.getRecordIndexGrowthFactor(), this.dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
        return Pair.of(fileGroupCount, records);
    }

    private List<Pair<String, FileSlice>> getPartitionFileSlicePairs() throws IOException {
        String latestInstant = this.dataMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().map(HoodieInstant::requestedTime).orElse("00000000000000");
        try (HoodieTableFileSystemView fsView = this.getMetadataView();){
            List<String> partitions = this.metadata.getAllPartitionPaths();
            fsView.loadAllPartitions();
            ArrayList<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<Pair<String, FileSlice>>();
            partitions.forEach(partition -> fsView.getLatestMergedFileSlicesBeforeOrOn((String)partition, latestInstant).forEach(fs -> partitionFileSlicePairs.add(Pair.of(partition, fs))));
            ArrayList<Pair<String, FileSlice>> arrayList = partitionFileSlicePairs;
            return arrayList;
        }
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition() throws IOException {
        HoodieTableFileSystemView fsView = this.getMetadataView();
        HoodieTable hoodieTable = this.getTable(this.dataWriteConfig, this.dataMetaClient);
        List<String> partitions = this.metadata.getAllPartitionPaths();
        fsView.loadAllPartitions();
        HoodieData<HoodieRecord> records = null;
        if (this.dataMetaClient.getTableConfig().getTableType() == HoodieTableType.COPY_ON_WRITE) {
            ArrayList<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new ArrayList<Pair<String, HoodieBaseFile>>();
            for (String string : partitions) {
                partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(string).map(basefile -> Pair.of(string, basefile)).collect(Collectors.toList()));
            }
            LOG.info("Initializing record index from " + partitionBaseFilePairs.size() + " base files in " + partitions.size() + " partitions");
            records = HoodieTableMetadataUtil.readRecordKeysFromBaseFiles(this.engineContext, this.dataWriteConfig, partitionBaseFilePairs, false, this.dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), this.dataMetaClient.getBasePath(), this.storageConf, this.getClass().getSimpleName());
        } else {
            ArrayList<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<Pair<String, FileSlice>>();
            String latestCommit = this.dataMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().map(instant -> instant.requestedTime()).orElse("00000000000000");
            for (String partition : partitions) {
                fsView.getLatestMergedFileSlicesBeforeOrOn(partition, latestCommit).forEach(fs -> partitionFileSlicePairs.add(Pair.of(partition, fs)));
            }
            LOG.info("Initializing record index from " + partitionFileSlicePairs.size() + " file slices in " + partitions.size() + " partitions");
            records = HoodieBackedTableMetadataWriter.readRecordKeysFromFileSliceSnapshot(this.engineContext, partitionFileSlicePairs, this.dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), this.getClass().getSimpleName(), this.dataMetaClient, this.dataWriteConfig, hoodieTable);
        }
        records.persist("MEMORY_AND_DISK_SER");
        long recordCount = records.count();
        int n = HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX, recordCount, 48, this.dataWriteConfig.getRecordIndexMinFileGroupCount(), this.dataWriteConfig.getRecordIndexMaxFileGroupCount(), this.dataWriteConfig.getRecordIndexGrowthFactor(), this.dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
        LOG.info("Initializing record index with {} mappings and {} file groups.", (Object)recordCount, (Object)n);
        return Pair.of(n, records);
    }

    private static HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(HoodieEngineContext engineContext, List<Pair<String, FileSlice>> partitionFileSlicePairs, int recordIndexMaxParallelism, String activeModule, HoodieTableMetaClient metaClient, HoodieWriteConfig dataWriteConfig, HoodieTable hoodieTable) {
        if (partitionFileSlicePairs.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        Option<String> instantTime = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime);
        engineContext.setJobStatus(activeModule, "Record Index: reading record keys from " + partitionFileSlicePairs.size() + " file slices");
        int parallelism = Math.min(partitionFileSlicePairs.size(), recordIndexMaxParallelism);
        return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndFileSlice -> {
            String partition = (String)partitionAndFileSlice.getKey();
            FileSlice fileSlice = (FileSlice)partitionAndFileSlice.getValue();
            String fileId = fileSlice.getFileId();
            return new HoodieMergedReadHandle(dataWriteConfig, instantTime, hoodieTable, Pair.of(partition, fileSlice.getFileId()), Option.of(fileSlice)).getMergedRecords().stream().map(record -> {
                HoodieRecord record1 = (HoodieRecord)record;
                return HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(), partition, fileId, record1.getCurrentLocation().getInstantTime(), 0);
            }).iterator();
        });
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeFilesPartition(List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList) {
        boolean fileGroupCount = true;
        List<String> partitions = partitionInfoList.stream().map(p -> HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath())).collect(Collectors.toList());
        int totalDataFilesCount = partitionInfoList.stream().mapToInt(HoodieTableMetadataUtil.DirectoryInfo::getTotalFiles).sum();
        LOG.info("Committing total {} partitions and {} files to metadata", (Object)partitions.size(), (Object)totalDataFilesCount);
        HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createPartitionListRecord(partitions);
        HoodieData<HoodieRecord<HoodieMetadataPayload>> allPartitionsRecord = this.engineContext.parallelize(Collections.singletonList(record), 1);
        if (partitionInfoList.isEmpty()) {
            return Pair.of(1, allPartitionsRecord);
        }
        this.engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating records for metadata FILES partition");
        HoodieData<HoodieRecord> fileListRecords = this.engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
            Map<String, Long> fileNameToSizeMap = partitionInfo.getFileNameToSizeMap();
            return HoodieMetadataPayload.createPartitionFilesRecord(partitionInfo.getRelativePath(), fileNameToSizeMap, Collections.emptyList());
        });
        ValidationUtils.checkState(fileListRecords.count() == (long)partitions.size());
        return Pair.of(1, allPartitionsRecord.union(fileListRecords));
    }

    private Set<String> getPendingDataInstants(HoodieTableMetaClient dataMetaClient) {
        return dataMetaClient.getActiveTimeline().getInstantsAsStream().filter(i -> !i.isCompleted()).filter(i -> !"indexing".equals(i.getAction())).map(HoodieInstant::requestedTime).collect(Collectors.toSet());
    }

    String getTimelineHistoryPath() {
        return HoodieTableConfig.TIMELINE_HISTORY_PATH.defaultValue();
    }

    private HoodieTableMetaClient initializeMetaClient() throws IOException {
        HoodieTableMetaClient.newTableBuilder().setTableType(HoodieTableType.MERGE_ON_READ).setTableName(this.dataWriteConfig.getTableName() + "_metadata").setTableVersion(this.dataWriteConfig.getWriteVersion()).setArchiveLogFolder(this.getTimelineHistoryPath()).setPayloadClassName(HoodieMetadataPayload.class.getName()).setBaseFileFormat(HoodieFileFormat.HFILE.toString()).setRecordKeyFields(RECORD_KEY_FIELD_NAME).setPopulateMetaFields(false).setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName()).initTable(this.storageConf.newInstance(), this.metadataWriteConfig.getBasePath());
        return HoodieTableMetaClient.builder().setBasePath(this.metadataWriteConfig.getBasePath()).setConf(this.storageConf.newInstance()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
    }

    private List<HoodieTableMetadataUtil.DirectoryInfo> listAllPartitionsFromFilesystem(String initializationTime, Set<String> pendingDataInstants) {
        if (this.dataMetaClient.getActiveTimeline().countInstants() == 0) {
            return Collections.emptyList();
        }
        ArrayDeque<StoragePath> pathsToList = new ArrayDeque<StoragePath>();
        pathsToList.add(new StoragePath(this.dataWriteConfig.getBasePath()));
        LinkedList<HoodieTableMetadataUtil.DirectoryInfo> partitionsToBootstrap = new LinkedList<HoodieTableMetadataUtil.DirectoryInfo>();
        int fileListingParallelism = this.metadataWriteConfig.getFileListingParallelism();
        StorageConfiguration<?> storageConf = this.dataMetaClient.getStorageConf();
        String dirFilterRegex = this.dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
        StoragePath storageBasePath = this.dataMetaClient.getBasePath();
        while (!pathsToList.isEmpty()) {
            int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
            ArrayList pathsToProcess = new ArrayList(numDirsToList);
            for (int i = 0; i < numDirsToList; ++i) {
                pathsToProcess.add(pathsToList.poll());
            }
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " + numDirsToList + " partitions from filesystem");
            List<HoodieTableMetadataUtil.DirectoryInfo> processedDirectories = this.engineContext.map(pathsToProcess, path -> {
                HoodieHadoopStorage storage = new HoodieHadoopStorage((StoragePath)path, storageConf);
                String relativeDirPath = FSUtils.getRelativePartitionPath(storageBasePath, path);
                return new HoodieTableMetadataUtil.DirectoryInfo(relativeDirPath, ((HoodieStorage)storage).listDirectEntries((StoragePath)path), initializationTime, pendingDataInstants);
            }, numDirsToList);
            for (HoodieTableMetadataUtil.DirectoryInfo dirInfo : processedDirectories) {
                String relativePath;
                if (!dirFilterRegex.isEmpty() && !(relativePath = dirInfo.getRelativePath()).isEmpty() && relativePath.matches(dirFilterRegex)) {
                    LOG.info("Ignoring directory {} which matches the filter regex {}", (Object)relativePath, (Object)dirFilterRegex);
                    continue;
                }
                if (dirInfo.isHoodiePartition()) {
                    partitionsToBootstrap.add(dirInfo);
                    continue;
                }
                pathsToList.addAll(dirInfo.getSubDirectories());
            }
        }
        return partitionsToBootstrap;
    }

    private List<HoodieTableMetadataUtil.DirectoryInfo> listAllPartitionsFromMDT(String initializationTime, Set<String> pendingDataInstants) throws IOException {
        List<String> allAbsolutePartitionPaths = this.metadata.getAllPartitionPaths().stream().map(partitionPath -> this.dataWriteConfig.getBasePath() + '/' + partitionPath).collect(Collectors.toList());
        Map<String, List<StoragePathInfo>> partitionFileMap = this.metadata.getAllFilesInPartitions(allAbsolutePartitionPaths);
        ArrayList<HoodieTableMetadataUtil.DirectoryInfo> dirinfoList = new ArrayList<HoodieTableMetadataUtil.DirectoryInfo>(partitionFileMap.size());
        for (Map.Entry<String, List<StoragePathInfo>> entry : partitionFileMap.entrySet()) {
            String relativeDirPath = FSUtils.getRelativePartitionPath(new StoragePath(this.dataWriteConfig.getBasePath()), new StoragePath(entry.getKey()));
            dirinfoList.add(new HoodieTableMetadataUtil.DirectoryInfo(relativeDirPath, entry.getValue(), initializationTime, pendingDataInstants, false));
        }
        return dirinfoList;
    }

    private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime, int fileGroupCount, String partitionName) throws IOException {
        StoragePath partitionPath = new StoragePath(this.metadataWriteConfig.getBasePath(), partitionName);
        HoodieStorage storage = this.metadataMetaClient.getStorage();
        try {
            List<StoragePathInfo> existingFiles = storage.listDirectEntries(partitionPath);
            if (existingFiles.size() > 0) {
                LOG.warn("Deleting all existing files found in MDT partition {}", (Object)partitionName);
                storage.deleteDirectory(partitionPath);
                ValidationUtils.checkState(!storage.exists(partitionPath), "Failed to delete MDT partition " + partitionName);
            }
        }
        catch (FileNotFoundException existingFiles) {
            // empty catch block
        }
        String msg = String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s", fileGroupCount, partitionName, metadataPartition.getFileIdPrefix(), instantTime);
        LOG.info(msg);
        List fileGroupFileIds = IntStream.range(0, fileGroupCount).mapToObj(i -> HoodieTableMetadataUtil.getFileIDForFileGroup(metadataPartition, i, partitionName)).collect(Collectors.toList());
        ValidationUtils.checkArgument(fileGroupFileIds.size() == fileGroupCount);
        this.engineContext.setJobStatus(this.getClass().getSimpleName(), msg);
        this.engineContext.foreach(fileGroupFileIds, fileGroupFileId -> {
            try {
                Map<HoodieLogBlock.HeaderMetadataType, String> blockHeader = Collections.singletonMap(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
                HoodieDeleteBlock block = new HoodieDeleteBlock(Collections.emptyList(), blockHeader);
                try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.constructAbsolutePath(this.metadataWriteConfig.getBasePath(), partitionName)).withFileId((String)fileGroupFileId).withInstantTime(instantTime).withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION).withFileSize(0L).withSizeThreshold(this.metadataWriteConfig.getLogFileMaxSize()).withStorage(dataMetaClient.getStorage()).withLogWriteToken("0-0-0").withTableVersion(this.metadataWriteConfig.getWriteVersion()).withFileExtension(".log").build();){
                    writer.appendBlock(block);
                }
            }
            catch (InterruptedException e) {
                throw new HoodieException(String.format("Failed to created fileGroup %s for partition %s", fileGroupFileId, partitionName), e);
            }
        }, fileGroupFileIds.size());
    }

    @Override
    public void dropMetadataPartitions(List<String> metadataPartitions) throws IOException {
        for (String partitionPath : metadataPartitions) {
            LOG.warn("Deleting Metadata Table partition: {}", (Object)partitionPath);
            this.dataMetaClient.getStorage().deleteDirectory(new StoragePath(this.metadataWriteConfig.getBasePath(), partitionPath));
            LOG.warn("Deleting pending indexing instant from the timeline for partition: {}", (Object)partitionPath);
            HoodieBackedTableMetadataWriter.deletePendingIndexingInstant(this.dataMetaClient, partitionPath);
        }
        this.closeInternal();
    }

    private static void deletePendingIndexingInstant(HoodieTableMetaClient metaClient, String partitionPath) {
        InstantGenerator instantGenerator = metaClient.getInstantGenerator();
        metaClient.reloadActiveTimeline().filterPendingIndexTimeline().getInstantsAsStream().filter(instant -> HoodieInstant.State.REQUESTED.equals((Object)instant.getState())).forEach(instant -> {
            try {
                HoodieIndexPlan indexPlan = metaClient.getActiveTimeline().readIndexPlan((HoodieInstant)instant);
                if (indexPlan.getIndexPartitionInfos().stream().anyMatch(indexPartitionInfo -> indexPartitionInfo.getMetadataPartitionPath().equals(partitionPath))) {
                    metaClient.getActiveTimeline().deleteInstantFileIfExists((HoodieInstant)instant);
                    metaClient.getActiveTimeline().deleteInstantFileIfExists(instantGenerator.getIndexInflightInstant(instant.requestedTime()));
                }
            }
            catch (IOException e) {
                LOG.error("Failed to delete the instant file corresponding to {}", instant);
            }
        });
    }

    void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) {
        Set<String> partitionsToUpdate = this.getMetadataPartitionsToUpdate();
        if (this.initialized && this.metadata != null) {
            Map<String, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata().entrySet().stream().filter(entry -> partitionsToUpdate.contains(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            this.commit(instantTime, partitionRecordsMap);
        }
    }

    private Set<String> getMetadataPartitionsToUpdate() {
        Set<String> partitionsToUpdate = this.dataMetaClient.getTableConfig().getMetadataPartitions();
        partitionsToUpdate.addAll(HoodieTableMetadataUtil.getInflightMetadataPartitions(this.dataMetaClient.getTableConfig()));
        if (!partitionsToUpdate.isEmpty()) {
            return partitionsToUpdate;
        }
        LOG.debug("There are no partitions to update according to table config. Falling back to enabled partition types in the write config.");
        return this.getEnabledPartitionTypes().stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
    }

    @Override
    public void buildMetadataPartitions(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos, String instantTime) throws IOException {
        if (indexPartitionInfos.isEmpty()) {
            LOG.warn("No partition to index in the plan");
            return;
        }
        String indexUptoInstantTime = indexPartitionInfos.get(0).getIndexUptoInstant();
        ArrayList<String> partitionPaths = new ArrayList<String>();
        ArrayList<MetadataPartitionType> partitionTypes = new ArrayList<MetadataPartitionType>();
        indexPartitionInfos.forEach(indexPartitionInfo -> {
            String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath();
            LOG.info("Creating a new metadata index for partition '{}' under path {} upto instant {}", new Object[]{relativePartitionPath, this.metadataWriteConfig.getBasePath(), indexUptoInstantTime});
            MetadataPartitionType partitionType = MetadataPartitionType.fromPartitionPath(relativePartitionPath);
            if (!this.enabledPartitionTypes.contains((Object)partitionType)) {
                throw new HoodieIndexException(String.format("Indexing for metadata partition: %s is not enabled", new Object[]{partitionType}));
            }
            partitionTypes.add(partitionType);
            partitionPaths.add(relativePartitionPath);
        });
        this.dataMetaClient.getTableConfig().setMetadataPartitionsInflight(this.dataMetaClient, partitionPaths);
        this.initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
    }

    @Override
    public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
        this.mayBeReinitMetadataReader();
        this.processAndCommit(instantTime, () -> {
            Map<String, HoodieData<HoodieRecord>> partitionToRecordMap = HoodieTableMetadataUtil.convertMetadataToRecords(this.engineContext, this.dataWriteConfig, commitMetadata, instantTime, this.dataMetaClient, this.getTableMetadata(), this.dataWriteConfig.getMetadataConfig(), this.getMetadataPartitionsToUpdate(), this.dataWriteConfig.getBloomFilterType(), this.dataWriteConfig.getBloomIndexParallelism(), this.dataWriteConfig.getWritesFileIdEncoding(), this.getEngineType(), Option.of(this.dataWriteConfig.getRecordMerger().getRecordType()));
            if (this.getMetadataPartitionsToUpdate().contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath())) {
                HoodieData<HoodieRecord> additionalUpdates = this.getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()), commitMetadata);
                partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
            }
            this.updateExpressionIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
            this.updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, instantTime);
            return partitionToRecordMap;
        });
        this.closeInternal();
    }

    private void updateExpressionIndexIfPresent(HoodieCommitMetadata commitMetadata, String instantTime, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap) {
        if (!MetadataPartitionType.EXPRESSION_INDEX.isMetadataPartitionAvailable(this.dataMetaClient)) {
            return;
        }
        this.dataMetaClient.getTableConfig().getMetadataPartitions().stream().filter(partition -> partition.startsWith("expr_index_")).forEach(partition -> {
            HoodieData<HoodieRecord> expressionIndexRecords;
            try {
                expressionIndexRecords = this.getExpressionIndexUpdates(commitMetadata, (String)partition, instantTime);
            }
            catch (Exception e) {
                throw new HoodieMetadataException(String.format("Failed to get expression index updates for partition %s", partition), e);
            }
            partitionToRecordMap.put((String)partition, expressionIndexRecords);
        });
    }

    protected HoodieData<HoodieRecord> getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception {
        throw new UnsupportedOperationException("Expression Index only supported with SPARK engine.");
    }

    private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap, String instantTime) {
        if (!this.dataWriteConfig.isSecondaryIndexEnabled()) {
            return;
        }
        WriteOperationType operationType = commitMetadata.getOperationType();
        if (operationType.isInsertOverwriteOrDeletePartition() && MetadataPartitionType.SECONDARY_INDEX.isMetadataPartitionAvailable(this.dataMetaClient)) {
            throw new HoodieIndexException(String.format("Can not perform operation %s on secondary index", new Object[]{operationType}));
        }
        if (operationType == WriteOperationType.COMPACT || operationType == WriteOperationType.CLUSTER) {
            return;
        }
        this.dataMetaClient.getTableConfig().getMetadataPartitions().stream().filter(partition -> partition.startsWith("secondary_index_")).forEach(partition -> {
            HoodieData<HoodieRecord> secondaryIndexRecords;
            try {
                secondaryIndexRecords = this.getSecondaryIndexUpdates(commitMetadata, (String)partition, instantTime);
            }
            catch (Exception e) {
                throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e);
            }
            partitionToRecordMap.put((String)partition, secondaryIndexRecords);
        });
    }

    private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) {
        List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
        if (allWriteStats.isEmpty() || WriteOperationType.isCompactionOrClustering(commitMetadata.getOperationType())) {
            return this.engineContext.emptyHoodieData();
        }
        HoodieIndexDefinition indexDefinition = this.getIndexDefinition(indexPartition);
        HoodieTableFileSystemView fsView = this.getMetadataView();
        fsView.loadPartitions(new ArrayList<String>(commitMetadata.getWritePartitionPaths()));
        return SecondaryIndexRecordGenerationUtils.convertWriteStatsToSecondaryIndexRecords(allWriteStats, instantTime, indexDefinition, this.dataWriteConfig.getMetadataConfig(), fsView, this.dataMetaClient, this.engineContext, this.getEngineType());
    }

    @Override
    public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
        this.mayBeReinitMetadataReader();
        this.processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(this.engineContext, cleanMetadata, instantTime, this.dataMetaClient, this.dataWriteConfig.getMetadataConfig(), this.enabledPartitionTypes, this.dataWriteConfig.getBloomIndexParallelism(), Option.of(this.dataWriteConfig.getRecordMerger().getRecordType())));
        this.closeInternal();
    }

    @Override
    public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
        this.mayBeReinitMetadataReader();
        this.dataMetaClient.reloadActiveTimeline();
        InstantGenerator datainstantGenerator = this.dataMetaClient.getInstantGenerator();
        HoodieInstant restoreInstant = datainstantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, "restore", instantTime);
        HoodieInstant requested = datainstantGenerator.getRestoreRequestedInstant(restoreInstant);
        HoodieRestorePlan restorePlan = null;
        try {
            restorePlan = this.dataMetaClient.getActiveTimeline().readRestorePlan(requested);
        }
        catch (IOException e) {
            throw new HoodieIOException(String.format("Deserialization of restore plan failed whose restore instant time is %s in data table", instantTime), e);
        }
        String restoreToInstantTime = restorePlan.getSavepointToRestoreTimestamp();
        LOG.info("Triggering restore to {} in metadata table", (Object)restoreToInstantTime);
        List filesGroups = this.metadata.getMetadataFileSystemView().getAllFileGroups(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
        boolean cannotRestore = filesGroups.stream().map(fileGroup -> fileGroup.getAllFileSlices().map(FileSlice::getBaseInstantTime).anyMatch(instantTime1 -> InstantComparison.compareTimestamps(instantTime1, InstantComparison.LESSER_THAN_OR_EQUALS, restoreToInstantTime))).anyMatch(canRestore -> canRestore == false);
        if (cannotRestore) {
            throw new HoodieMetadataException(String.format("Can't restore to %s since there is no base file in MDT lesser than the commit to restore to. Please delete metadata table and retry", restoreToInstantTime));
        }
        List<HoodieTableMetadataUtil.DirectoryInfo> dirInfoList = this.listAllPartitionsFromFilesystem(instantTime, Collections.emptySet());
        Map<String, HoodieTableMetadataUtil.DirectoryInfo> dirInfoMap = dirInfoList.stream().collect(Collectors.toMap(HoodieTableMetadataUtil.DirectoryInfo::getRelativePath, Function.identity()));
        dirInfoList.clear();
        BaseHoodieWriteClient<?, I, ?, ?> writeClient = this.getWriteClient();
        writeClient.restoreToInstant(restoreToInstantTime, false);
        try {
            this.initMetadataReader();
            HashMap<String, Map<String, Long>> partitionFilesToAdd = new HashMap<String, Map<String, Long>>();
            HashMap<String, List<String>> partitionFilesToDelete = new HashMap<String, List<String>>();
            ArrayList<String> partitionsToDelete = new ArrayList<String>();
            this.fetchOutofSyncFilesRecordsFromMetadataTable(dirInfoMap, partitionFilesToAdd, partitionFilesToDelete, partitionsToDelete);
            String syncCommitTime = this.createRestoreInstantTime();
            this.processAndCommit(syncCommitTime, () -> HoodieTableMetadataUtil.convertMissingPartitionRecords(this.engineContext, partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete, syncCommitTime));
            this.closeInternal();
        }
        catch (IOException e) {
            throw new HoodieMetadataException("IOException during MDT restore sync", e);
        }
    }

    String createRestoreInstantTime() {
        return this.writeClient.createNewInstantTime(false);
    }

    @Override
    public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
        if (this.initialized && this.metadata != null) {
            this.mayBeReinitMetadataReader();
            String commitToRollbackInstantTime = rollbackMetadata.getCommitsRollback().get(0);
            HoodieInstant deltaCommitInstant = this.metadataMetaClient.createNewInstant(HoodieInstant.State.COMPLETED, "deltacommit", commitToRollbackInstantTime);
            if (this.metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().containsInstant(deltaCommitInstant)) {
                this.validateRollback(commitToRollbackInstantTime);
                LOG.info("Rolling back MDT deltacommit {}", (Object)commitToRollbackInstantTime);
                if (!this.getWriteClient().rollback(commitToRollbackInstantTime, instantTime)) {
                    throw new HoodieMetadataException(String.format("Failed to rollback deltacommit at %s", commitToRollbackInstantTime));
                }
            } else {
                LOG.info("Ignoring rollback of instant {} at {}. The commit to rollback is not found in MDT", (Object)commitToRollbackInstantTime, (Object)instantTime);
            }
            this.closeInternal();
        }
    }

    private void validateRollback(String commitToRollbackInstantTime) {
        Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo = CompactionUtils.getDeltaCommitsSinceLatestCompaction(this.metadataMetaClient.getActiveTimeline());
        HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue();
        HoodieTimeline deltacommitsSinceCompaction = deltaCommitsInfo.get().getKey();
        if (compactionInstant.getAction().equals("commit")) {
            String compactionInstantTime = compactionInstant.requestedTime();
            if (commitToRollbackInstantTime.length() == compactionInstantTime.length() && InstantComparison.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, compactionInstantTime)) {
                throw new HoodieMetadataException(String.format("Commit being rolled back %s is earlier than the latest compaction %s. There are %d deltacommits after this compaction: %s", commitToRollbackInstantTime, compactionInstantTime, deltacommitsSinceCompaction.countInstants(), deltacommitsSinceCompaction.getInstants()));
            }
        }
    }

    @Override
    public void close() throws Exception {
        if (this.metadata != null) {
            this.metadata.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
        if (this.metadataView != null) {
            this.metadataView.close();
            this.metadataView = null;
        }
    }

    protected abstract void commit(String var1, Map<String, HoodieData<HoodieRecord>> var2);

    protected abstract I convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> var1);

    protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
        ValidationUtils.checkState(this.metadataMetaClient != null, "Metadata table is not fully initialized yet.");
        HoodieData<HoodieRecord> preppedRecords = this.prepRecords(partitionRecordsMap);
        I preppedRecordInputs = this.convertHoodieDataToEngineSpecificData(preppedRecords);
        BaseHoodieWriteClient<?, I, ?, ?> writeClient = this.getWriteClient();
        this.metadataMetaClient = HoodieBackedTableMetadataWriter.rollbackFailedWrites(this.dataWriteConfig, writeClient, this.metadataMetaClient);
        if (!this.metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime)) {
            LOG.info("New commit at {} being applied to MDT.", (Object)instantTime);
        } else {
            Option<HoodieInstant> alreadyCompletedInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.requestedTime().equals(instantTime)).lastInstant();
            LOG.info("{} completed commit at {} being applied to MDT.", (Object)(alreadyCompletedInstant.isPresent() ? "Already" : "Partially"), (Object)instantTime);
            if (!writeClient.rollback(instantTime)) {
                throw new HoodieMetadataException(String.format("Failed to rollback deltacommit at %s from MDT", instantTime));
            }
            this.metadataMetaClient.reloadActiveTimeline();
        }
        writeClient.startCommitWithTime(instantTime);
        this.preWrite(instantTime);
        if (isInitializing) {
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Bulk inserting at %s into metadata table %s", instantTime, this.metadataWriteConfig.getTableName()));
            writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime, bulkInsertPartitioner);
        } else {
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Upserting at %s into metadata table %s", instantTime, this.metadataWriteConfig.getTableName()));
            writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
        }
        this.metadataMetaClient.reloadActiveTimeline();
        this.metrics.ifPresent(m -> m.updateSizeMetrics(this.metadataMetaClient, this.metadata, this.dataMetaClient.getTableConfig().getMetadataPartitions()));
    }

    static <I> HoodieTableMetaClient rollbackFailedWrites(HoodieWriteConfig dataWriteConfig, BaseHoodieWriteClient<?, I, ?, ?> writeClient, HoodieTableMetaClient metadataMetaClient) {
        if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() && writeClient.rollbackFailedWrites(metadataMetaClient)) {
            metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
        }
        return metadataMetaClient;
    }

    protected void preWrite(String instantTime) {
    }

    protected abstract void bulkCommit(String var1, String var2, HoodieData<HoodieRecord> var3, int var4);

    protected HoodieData<HoodieRecord> prepRecords(Map<String, HoodieData<HoodieRecord>> partitionRecordsMap) {
        HoodieData<HoodieRecord> allPartitionRecords = this.engineContext.emptyHoodieData();
        try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(this.metadataMetaClient);){
            for (Map.Entry<String, HoodieData<HoodieRecord>> entry : partitionRecordsMap.entrySet()) {
                int fileGroupCount;
                String partitionName = entry.getKey();
                HoodieData<HoodieRecord> records = entry.getValue();
                List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(this.metadataMetaClient, Option.ofNullable(fsView), partitionName);
                if (fileSlices.isEmpty()) {
                    fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(this.metadataMetaClient, Option.ofNullable(fsView), partitionName);
                }
                ValidationUtils.checkArgument((fileGroupCount = fileSlices.size()) > 0, String.format("FileGroup count for MDT partition %s should be > 0", partitionName));
                List<FileSlice> finalFileSlices = fileSlices;
                HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
                    FileSlice slice = (FileSlice)finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), fileGroupCount));
                    r.unseal();
                    r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
                    r.seal();
                    return r;
                });
                allPartitionRecords = allPartitionRecords.union(rddSinglePartitionRecords);
            }
            HoodieData<HoodieRecord> hoodieData = allPartitionRecords;
            return hoodieData;
        }
    }

    @Override
    public void performTableServices(Option<String> inFlightInstantTimestamp, boolean requiresTimelineRefresh) {
        String metadataTableName;
        Option<HoodieInstant> lastInstant;
        BaseHoodieWriteClient<?, I, ?, ?> writeClient;
        boolean allTableServicesExecutedSuccessfullyOrSkipped;
        HoodieTimer metadataTableServicesTimer;
        block12: {
            metadataTableServicesTimer = HoodieTimer.start();
            allTableServicesExecutedSuccessfullyOrSkipped = true;
            writeClient = this.getWriteClient();
            HoodieActiveTimeline activeTimeline = HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline(this.metadataMetaClient, writeClient, requiresTimelineRefresh);
            lastInstant = activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
            if (lastInstant.isPresent()) break block12;
            String metadataTableName2 = writeClient.getConfig().getTableName();
            boolean tableNameExists = StringUtils.nonEmpty(metadataTableName2);
            String executionDurationMetricName = tableNameExists ? String.format("%s.%s", metadataTableName2, "table_service_execution_duration") : "table_service_execution_duration";
            String executionStatusMetricName = tableNameExists ? String.format("%s.%s", metadataTableName2, "table_service_execution_status") : "table_service_execution_status";
            long timeSpent = metadataTableServicesTimer.endTimer();
            this.metrics.ifPresent(m -> m.setMetric(executionDurationMetricName, timeSpent));
            if (allTableServicesExecutedSuccessfullyOrSkipped) {
                this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, 1L));
            } else {
                this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, -1L));
            }
            return;
        }
        try {
            this.cleanIfNecessary(writeClient, lastInstant.get().requestedTime());
            if (this.validateCompactionScheduling(inFlightInstantTimestamp, lastInstant.get().requestedTime())) {
                String latestDeltacommitTime = lastInstant.get().requestedTime();
                LOG.info("Latest deltacommit time found is {}, running compaction operations.", (Object)latestDeltacommitTime);
                this.compactIfNecessary(writeClient, Option.of(latestDeltacommitTime));
            }
            writeClient.archive();
            LOG.info("All the table services operations on MDT completed successfully");
            metadataTableName = writeClient.getConfig().getTableName();
        }
        catch (Exception e) {
            try {
                LOG.error("Exception in running table services on metadata table", (Throwable)e);
                allTableServicesExecutedSuccessfullyOrSkipped = false;
                throw e;
            }
            catch (Throwable throwable) {
                String metadataTableName3 = writeClient.getConfig().getTableName();
                boolean tableNameExists = StringUtils.nonEmpty(metadataTableName3);
                String executionDurationMetricName = tableNameExists ? String.format("%s.%s", metadataTableName3, "table_service_execution_duration") : "table_service_execution_duration";
                String executionStatusMetricName = tableNameExists ? String.format("%s.%s", metadataTableName3, "table_service_execution_status") : "table_service_execution_status";
                long timeSpent = metadataTableServicesTimer.endTimer();
                this.metrics.ifPresent(m -> m.setMetric(executionDurationMetricName, timeSpent));
                if (allTableServicesExecutedSuccessfullyOrSkipped) {
                    this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, 1L));
                } else {
                    this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, -1L));
                }
                throw throwable;
            }
        }
        boolean tableNameExists = StringUtils.nonEmpty(metadataTableName);
        String executionDurationMetricName = tableNameExists ? String.format("%s.%s", metadataTableName, "table_service_execution_duration") : "table_service_execution_duration";
        String executionStatusMetricName = tableNameExists ? String.format("%s.%s", metadataTableName, "table_service_execution_status") : "table_service_execution_status";
        long timeSpent = metadataTableServicesTimer.endTimer();
        this.metrics.ifPresent(m -> m.setMetric(executionDurationMetricName, timeSpent));
        if (allTableServicesExecutedSuccessfullyOrSkipped) {
            this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, 1L));
        } else {
            this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, -1L));
        }
    }

    static HoodieActiveTimeline runPendingTableServicesOperationsAndRefreshTimeline(HoodieTableMetaClient metadataMetaClient, BaseHoodieWriteClient<?, ?, ?, ?> writeClient, boolean initialTimelineRequiresRefresh) {
        HoodieActiveTimeline activeTimeline = initialTimelineRequiresRefresh ? metadataMetaClient.reloadActiveTimeline() : metadataMetaClient.getActiveTimeline();
        boolean ranServices = false;
        if (activeTimeline.filterPendingCompactionTimeline().countInstants() > 0) {
            writeClient.runAnyPendingCompactions();
            ranServices = true;
        }
        if (activeTimeline.filterPendingLogCompactionTimeline().countInstants() > 0) {
            writeClient.runAnyPendingLogCompactions();
            ranServices = true;
        }
        return ranServices ? metadataMetaClient.reloadActiveTimeline() : activeTimeline;
    }

    void compactIfNecessary(BaseHoodieWriteClient<?, I, ?, ?> writeClient, Option<String> latestDeltaCommitTimeOpt) {
        HoodieTimeline metadataCompletedTimeline = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants();
        String compactionInstantTime = this.dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested().filter(instant -> metadataCompletedTimeline.containsInstant(instant.requestedTime())).firstInstant().map(instant -> HoodieInstantTimeGenerator.instantTimeMinusMillis(instant.requestedTime(), 1L)).orElse(writeClient.createNewInstantTime(false));
        if (this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) {
            LOG.info("Compaction with same {} time is already present in the timeline.", (Object)compactionInstantTime);
        } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
            LOG.info("Compaction is scheduled for timestamp {}", (Object)compactionInstantTime);
            writeClient.compact(compactionInstantTime);
        } else if (this.metadataWriteConfig.isLogCompactionEnabled()) {
            String logCompactionInstantTime = this.metadataMetaClient.createNewInstantTime(false);
            if (this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime)) {
                LOG.info("Log compaction with same {} time is already present in the timeline.", (Object)logCompactionInstantTime);
            } else if (writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, Option.empty())) {
                LOG.info("Log compaction is scheduled for timestamp {}", (Object)logCompactionInstantTime);
                writeClient.logCompact(logCompactionInstantTime);
            }
        }
    }

    protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) {
        Option<HoodieInstant> lastCompletedCompactionInstant = this.metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant();
        if (lastCompletedCompactionInstant.isPresent() && this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().findInstantsAfter(lastCompletedCompactionInstant.get().requestedTime()).countInstants() < 3) {
            return;
        }
        writeClient.clean(this.createCleanInstantTime(instantTime));
        writeClient.lazyRollbackFailedIndexing();
    }

    String createCleanInstantTime(String instantTime) {
        return this.metadataMetaClient.createNewInstantTime(false);
    }

    boolean validateCompactionScheduling(Option<String> inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
        if (this.metadataWriteConfig.isLogCompactionEnabled()) {
            Option<HoodieInstant> pendingLogCompactionInstant = this.metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
            Option<HoodieInstant> pendingCompactionInstant = this.metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
            if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) {
                LOG.warn("Not scheduling compaction or logCompaction, since a pending compaction instant {} or logCompaction {} instant is present", pendingCompactionInstant, pendingLogCompactionInstant);
                return false;
            }
        }
        return true;
    }

    private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, HoodieTableMetadataUtil.DirectoryInfo> dirInfoMap, Map<String, Map<String, Long>> partitionFilesToAdd, Map<String, List<String>> partitionFilesToDelete, List<String> partitionsToDelete) throws IOException {
        for (String partition : this.metadata.fetchAllPartitionPaths()) {
            StoragePath partitionPath = null;
            partitionPath = StringUtils.isNullOrEmpty(partition) && !this.dataMetaClient.getTableConfig().isTablePartitioned() ? new StoragePath(this.dataWriteConfig.getBasePath()) : new StoragePath(this.dataWriteConfig.getBasePath(), partition);
            String partitionId = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition);
            List<StoragePathInfo> metadataFiles = this.metadata.getAllFilesInPartition(partitionPath);
            if (!dirInfoMap.containsKey(partition)) {
                partitionsToDelete.add(partitionId);
                if (metadataFiles == null || metadataFiles.size() <= 0) continue;
                partitionFilesToDelete.put(partitionId, metadataFiles.stream().map(f -> f.getPath().getName()).collect(Collectors.toList()));
                continue;
            }
            Map<String, Long> fsFiles = dirInfoMap.get(partition).getFileNameToSizeMap();
            List mdtFiles = metadataFiles.stream().map(mdtFile -> mdtFile.getPath().getName()).collect(Collectors.toList());
            List filesDeleted = metadataFiles.stream().map(f -> f.getPath().getName()).filter(n -> !fsFiles.containsKey(n)).collect(Collectors.toList());
            HashMap filesToAdd = new HashMap();
            dirInfoMap.get(partition).getFileNameToSizeMap().forEach((k, v) -> {
                if (!mdtFiles.contains(k)) {
                    filesToAdd.put(k, v);
                }
            });
            if (!filesToAdd.isEmpty()) {
                partitionFilesToAdd.put(partitionId, filesToAdd);
            }
            if (filesDeleted.isEmpty()) continue;
            partitionFilesToDelete.put(partitionId, filesDeleted);
        }
    }

    private HoodieData<HoodieRecord> getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata replaceCommitMetadata) {
        HoodieTableFileSystemView fsView = this.getMetadataView();
        List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream().flatMap(partition -> fsView.getLatestBaseFiles((String)partition).map(f -> Pair.of(partition, f))).collect(Collectors.toList());
        return HoodieTableMetadataUtil.readRecordKeysFromBaseFiles(this.engineContext, this.dataWriteConfig, partitionBaseFilePairs, true, this.dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), this.dataMetaClient.getBasePath(), this.storageConf, this.getClass().getSimpleName());
    }

    private HoodieData<HoodieRecord> getRecordIndexAdditionalUpserts(HoodieData<HoodieRecord> updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) {
        WriteOperationType operationType = commitMetadata.getOperationType();
        if (operationType == WriteOperationType.INSERT_OVERWRITE) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata).mapToPair(r -> Pair.of(r.getKey(), r)).leftOuterJoin(updatesFromWriteStatuses.mapToPair(r -> Pair.of(r.getKey(), r))).values().filter(p -> !((Option)p.getRight()).isPresent()).map(Pair::getLeft);
        }
        if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata).mapToPair(r -> Pair.of(r.getRecordKey(), r)).leftOuterJoin(updatesFromWriteStatuses.mapToPair(r -> Pair.of(r.getRecordKey(), r))).values().filter(p -> !((Option)p.getRight()).isPresent()).map(Pair::getLeft);
        }
        if (operationType == WriteOperationType.DELETE_PARTITION) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata);
        }
        return this.engineContext.emptyHoodieData();
    }

    protected void closeInternal() {
        try {
            this.close();
        }
        catch (Exception e) {
            throw new HoodieException("Failed to close HoodieMetadata writer ", e);
        }
    }

    @Override
    public boolean isInitialized() {
        return this.initialized;
    }

    protected BaseHoodieWriteClient<?, I, ?, ?> getWriteClient() {
        if (this.writeClient == null) {
            this.writeClient = this.initializeWriteClient();
        }
        return this.writeClient;
    }

    protected abstract BaseHoodieWriteClient<?, I, ?, ?> initializeWriteClient();

    static interface ConvertMetadataFunction {
        public Map<String, HoodieData<HoodieRecord>> convertMetadata();
    }
}

