/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.expression.BindVisitor;
import org.apache.hudi.expression.Expression;
import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.io.storage.HoodieSeekingFileReader;
import org.apache.hudi.metadata.BaseTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SecondaryIndexKeyUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.util.Transient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieBackedTableMetadata
extends BaseTableMetadata {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieBackedTableMetadata.class);
    private final String metadataBasePath;
    private HoodieTableMetaClient metadataMetaClient;
    private HoodieTableConfig metadataTableConfig;
    private HoodieTableFileSystemView metadataFileSystemView;
    private final boolean reuse;
    private final Transient<Map<Pair<String, String>, Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader>>> partitionReaders = Transient.lazy(ConcurrentHashMap::new);
    private final Map<String, List<FileSlice>> partitionFileSliceMap = new ConcurrentHashMap<String, List<FileSlice>>();

    public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieStorage storage, HoodieMetadataConfig metadataConfig, String datasetBasePath) {
        this(engineContext, storage, metadataConfig, datasetBasePath, false);
    }

    public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieStorage storage, HoodieMetadataConfig metadataConfig, String datasetBasePath, boolean reuse) {
        super(engineContext, storage, metadataConfig, datasetBasePath);
        this.reuse = reuse;
        this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(this.dataBasePath.toString());
        this.initIfNeeded();
    }

    private void initIfNeeded() {
        if (!this.isMetadataTableInitialized) {
            if (!HoodieTableMetadata.isMetadataTable(this.metadataBasePath)) {
                LOG.info("Metadata table is disabled.");
            }
        } else if (this.metadataMetaClient == null) {
            try {
                this.metadataMetaClient = HoodieTableMetaClient.builder().setStorage(this.storage).setBasePath(this.metadataBasePath).build();
                this.metadataTableConfig = this.metadataMetaClient.getTableConfig();
            }
            catch (TableNotFoundException e) {
                LOG.warn("Metadata table was not found at path {}", (Object)this.metadataBasePath);
                this.isMetadataTableInitialized = false;
                this.metadataMetaClient = null;
                this.metadataFileSystemView = null;
                this.metadataTableConfig = null;
            }
            catch (Exception e) {
                LOG.error("Failed to initialize metadata table at path {}", (Object)this.metadataBasePath, (Object)e);
                this.isMetadataTableInitialized = false;
                this.metadataMetaClient = null;
                this.metadataFileSystemView = null;
                this.metadataTableConfig = null;
            }
        }
    }

    @Override
    protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key, String partitionName) {
        Map<String, HoodieRecord<HoodieMetadataPayload>> recordsByKeys = this.getRecordsByKeys(Collections.singletonList(key), partitionName);
        return Option.ofNullable(recordsByKeys.get(key));
    }

    @Override
    public List<String> getPartitionPathWithPathPrefixUsingFilterExpression(List<String> relativePathPrefixes, Types.RecordType partitionFields, Expression expression) throws IOException {
        Expression boundedExpr = expression.accept(new BindVisitor(partitionFields, false));
        List<String> selectedPartitionPaths = this.getPartitionPathWithPathPrefixes(relativePathPrefixes);
        if (this.hiveStylePartitioningEnabled && HoodieBackedTableMetadata.getPathPartitionLevel(partitionFields, selectedPartitionPaths.get(0)) == partitionFields.fields().size()) {
            return selectedPartitionPaths.stream().filter(p -> (Boolean)boundedExpr.eval(HoodieBackedTableMetadata.extractPartitionValues(partitionFields, p, this.urlEncodePartitioningEnabled))).collect(Collectors.toList());
        }
        return selectedPartitionPaths;
    }

    @Override
    public List<String> getPartitionPathWithPathPrefixes(List<String> relativePathPrefixes) throws IOException {
        return this.getAllPartitionPaths().stream().filter(p -> relativePathPrefixes.stream().anyMatch(relativePathPrefix -> StringUtils.isNullOrEmpty(relativePathPrefix) || p.equals(relativePathPrefix) || p.startsWith(relativePathPrefix + "/"))).collect(Collectors.toList());
    }

    @Override
    public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName, boolean shouldLoadInMemory) {
        ArrayList<String> sortedKeyPrefixes = new ArrayList<String>(keyPrefixes);
        Collections.sort(sortedKeyPrefixes);
        List partitionFileSlices = this.partitionFileSliceMap.computeIfAbsent(partitionName, k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(this.metadataMetaClient, this.getMetadataFileSystemView(), partitionName));
        ValidationUtils.checkState(!partitionFileSlices.isEmpty(), "Number of file slices for partition " + partitionName + " should be > 0");
        return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : this.getEngineContext().parallelize(partitionFileSlices)).flatMap(fileSlice -> {
            Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = this.openReaders(partitionName, (FileSlice)fileSlice);
            try {
                ArrayList<Long> timings = new ArrayList<Long>();
                HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
                HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
                if (baseFileReader == null && logRecordScanner == null) {
                    Iterator iterator = Collections.emptyIterator();
                    return iterator;
                }
                boolean fullKeys = false;
                Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = this.readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, timings);
                Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords = this.readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
                LOG.debug("Metadata read for {} keys took [baseFileRead, logMerge] {} ms", (Object)sortedKeyPrefixes.size(), timings);
                Iterator<HoodieRecord<HoodieMetadataPayload>> iterator = mergedRecords.values().iterator();
                return iterator;
            }
            catch (IOException ioe) {
                throw new HoodieIOException("Error merging records from metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
            }
            finally {
                this.closeReader(readers);
            }
        });
    }

    @Override
    protected Map<String, HoodieRecord<HoodieMetadataPayload>> getRecordsByKeys(List<String> keys, String partitionName) {
        Map<String, HoodieRecord<HoodieMetadataPayload>> result;
        if (keys.isEmpty()) {
            return Collections.emptyMap();
        }
        List partitionFileSlices = this.partitionFileSliceMap.computeIfAbsent(partitionName, k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(this.metadataMetaClient, this.getMetadataFileSystemView(), partitionName));
        int numFileSlices = partitionFileSlices.size();
        ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for partition " + partitionName + " should be > 0");
        if (numFileSlices == 1) {
            result = this.lookupKeysFromFileSlice(partitionName, keys, (FileSlice)partitionFileSlices.get(0));
        } else {
            ArrayList<ArrayList<String>> partitionedKeys = HoodieBackedTableMetadata.partitionKeysByFileSlices(keys, numFileSlices);
            result = new HashMap<String, HoodieRecord<HoodieMetadataPayload>>(keys.size());
            this.getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Reading keys from metadata table partition " + partitionName);
            this.getEngineContext().map(partitionedKeys, keysList -> {
                if (keysList.isEmpty()) {
                    return Collections.emptyMap();
                }
                int shardIndex = HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex((String)keysList.get(0), numFileSlices);
                return this.lookupKeysFromFileSlice(partitionName, (List<String>)keysList, (FileSlice)partitionFileSlices.get(shardIndex));
            }, partitionedKeys.size()).forEach(result::putAll);
        }
        return result;
    }

    private static ArrayList<ArrayList<String>> partitionKeysByFileSlices(List<String> keys, int numFileSlices) {
        ArrayList<ArrayList<String>> partitionedKeys = new ArrayList<ArrayList<String>>(numFileSlices);
        for (int i = 0; i < numFileSlices; ++i) {
            partitionedKeys.add(new ArrayList());
        }
        keys.forEach(key -> {
            int shardIndex = HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, numFileSlices);
            ((ArrayList)partitionedKeys.get(shardIndex)).add(key);
        });
        return partitionedKeys;
    }

    private Map<String, HoodieRecord<HoodieMetadataPayload>> lookupKeysFromFileSlice(String partitionName, List<String> keys, FileSlice fileSlice) {
        Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = this.getOrCreateReaders(partitionName, fileSlice);
        try {
            HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
            HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
            if (baseFileReader == null && logRecordScanner == null) {
                Map<String, HoodieRecord<HoodieMetadataPayload>> map = Collections.emptyMap();
                return map;
            }
            ArrayList<String> sortedKeys = new ArrayList<String>(keys);
            Collections.sort(sortedKeys);
            boolean fullKeys = true;
            ArrayList<Long> timings = new ArrayList<Long>(1);
            Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = this.readLogRecords(logRecordScanner, sortedKeys, fullKeys, timings);
            Map<String, HoodieRecord<HoodieMetadataPayload>> map = this.readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeys, fullKeys, logRecords, timings, partitionName);
            return map;
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Error merging records from metadata table for  " + keys.size() + " key : ", ioe);
        }
        finally {
            if (!this.reuse) {
                this.closeReader(readers);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<String, HoodieRecord<HoodieMetadataPayload>> readLogRecords(HoodieMetadataLogRecordReader logRecordReader, List<String> sortedKeys, boolean fullKey, List<Long> timings) {
        HoodieTimer timer = HoodieTimer.start();
        if (logRecordReader == null) {
            timings.add(timer.endTimer());
            return Collections.emptyMap();
        }
        try {
            Map<String, HoodieRecord<HoodieMetadataPayload>> map = fullKey ? logRecordReader.getRecordsByKeys(sortedKeys) : logRecordReader.getRecordsByKeyPrefixes(sortedKeys);
            return map;
        }
        finally {
            timings.add(timer.endTimer());
        }
    }

    private Map<String, HoodieRecord<HoodieMetadataPayload>> readFromBaseAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader, List<String> sortedKeys, boolean fullKeys, Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords, List<Long> timings, String partitionName) throws IOException {
        HoodieTimer timer = HoodieTimer.start();
        if (reader == null) {
            timings.add(timer.endTimer());
            return logRecords;
        }
        HoodieTimer readTimer = HoodieTimer.start();
        Map<String, HoodieRecord<HoodieMetadataPayload>> records = this.fetchBaseFileRecordsByKeys(reader, sortedKeys, fullKeys, partitionName);
        this.metrics.ifPresent(m -> m.updateMetrics("basefile_read", readTimer.endTimer()));
        logRecords.values().forEach(logRecord -> records.merge(logRecord.getRecordKey(), (HoodieRecord<HoodieMetadataPayload>)logRecord, (oldRecord, newRecord) -> {
            HoodieMetadataPayload mergedPayload = ((HoodieMetadataPayload)newRecord.getData()).preCombine((HoodieMetadataPayload)oldRecord.getData());
            return mergedPayload.isDeleted() ? null : new HoodieAvroRecord<HoodieMetadataPayload>(oldRecord.getKey(), mergedPayload);
        }));
        timings.add(timer.endTimer());
        return records;
    }

    private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieSeekingFileReader reader, List<String> sortedKeys, boolean fullKeys, String partitionName) throws IOException {
        Map<String, HoodieRecord<HoodieMetadataPayload>> result;
        try (ClosableIterator records = fullKeys ? reader.getRecordsByKeysIterator(sortedKeys) : reader.getRecordsByKeyPrefixIterator(sortedKeys);){
            result = CollectionUtils.toStream(records).map(record -> {
                GenericRecord data = (GenericRecord)record.getData();
                return Pair.of((String)data.get("key"), this.composeRecord(data, partitionName));
            }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        }
        return result;
    }

    private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord avroRecord, String partitionName) {
        if (this.metadataTableConfig.populateMetaFields()) {
            return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord, this.metadataTableConfig.getPayloadClass(), this.metadataTableConfig.getPreCombineField(), false);
        }
        return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord, this.metadataTableConfig.getPayloadClass(), this.metadataTableConfig.getPreCombineField(), Pair.of(this.metadataTableConfig.getRecordKeyFieldProp(), this.metadataTableConfig.getPartitionFieldProp()), false, Option.of(partitionName), Option.empty());
    }

    private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
        if (this.reuse) {
            Pair<String, String> key = Pair.of(partitionName, slice.getFileId());
            return this.partitionReaders.get().computeIfAbsent(key, ignored -> this.openReaders(partitionName, slice));
        }
        return this.openReaders(partitionName, slice);
    }

    private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> openReaders(String partitionName, FileSlice slice) {
        try {
            HoodieTimer timer = HoodieTimer.start();
            Pair<HoodieSeekingFileReader<?>, Long> baseFileReaderOpenTimePair = this.getBaseFileReader(slice, timer);
            HoodieSeekingFileReader<?> baseFileReader = baseFileReaderOpenTimePair.getKey();
            long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
            List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
            Pair<HoodieMetadataLogRecordReader, Long> logRecordScannerOpenTimePair = this.getLogRecordScanner(logFiles, partitionName, Option.empty(), Option.empty());
            HoodieMetadataLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
            long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
            this.metrics.ifPresent(metrics -> metrics.updateMetrics("scan", baseFileOpenMs + logScannerOpenMs));
            return Pair.of(baseFileReader, logRecordScanner);
        }
        catch (IOException e) {
            throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
        }
    }

    private Pair<HoodieSeekingFileReader<?>, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
        long baseFileOpenMs;
        HoodieSeekingFileReader baseFileReader;
        Option<HoodieBaseFile> baseFile = slice.getBaseFile();
        if (baseFile.isPresent()) {
            StoragePath baseFilePath = baseFile.get().getStoragePath();
            baseFileReader = (HoodieSeekingFileReader)HoodieIOFactory.getIOFactory(this.metadataMetaClient.getStorage()).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER, baseFilePath);
            baseFileOpenMs = timer.endTimer();
            LOG.info("Opened metadata base file from {} at instant {} in {} ms", new Object[]{baseFilePath, baseFile.get().getCommitTime(), baseFileOpenMs});
        } else {
            baseFileReader = null;
            baseFileOpenMs = 0L;
            timer.endTimer();
        }
        return Pair.of(baseFileReader, baseFileOpenMs);
    }

    public Pair<HoodieMetadataLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles, String partitionName, Option<Boolean> allowFullScanOverride, Option<String> timeTravelInstant) {
        HoodieTimer timer = HoodieTimer.start();
        List<String> sortedLogFilePaths = logFiles.stream().sorted(HoodieLogFile.getLogFileComparator()).map(o -> o.getPath().toString()).collect(Collectors.toList());
        Set<String> validInstantTimestamps = HoodieTableMetadataUtil.getValidInstantTimestamps(this.dataMetaClient, this.metadataMetaClient);
        Option<HoodieInstant> latestMetadataInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
        String latestMetadataInstantTime = latestMetadataInstant.map(HoodieInstant::requestedTime).orElse("00000000000000");
        if (timeTravelInstant.isPresent()) {
            latestMetadataInstantTime = InstantComparison.minTimestamp(latestMetadataInstantTime, timeTravelInstant.get());
        }
        boolean allowFullScan = allowFullScanOverride.orElseGet(() -> this.isFullScanAllowedForPartition(partitionName));
        Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
        HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(this.metadataConfig.getProps()).build();
        HoodieMetadataLogRecordReader logRecordScanner = HoodieMetadataLogRecordReader.newBuilder().withStorage(this.metadataMetaClient.getStorage()).withBasePath(this.metadataBasePath).withLogFilePaths(sortedLogFilePaths).withReaderSchema(schema).withLatestInstantTime(latestMetadataInstantTime).withMaxMemorySizeInBytes(this.metadataConfig.getMaxReaderMemory()).withBufferSize(this.metadataConfig.getMaxReaderBufferSize()).withSpillableMapBasePath(this.metadataConfig.getSplliableMapDir()).withDiskMapType(commonConfig.getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled()).withLogBlockTimestamps(validInstantTimestamps).enableFullScan(allowFullScan).withPartition(partitionName).withEnableOptimizedLogBlocksScan(this.metadataConfig.isOptimizedLogBlocksScanEnabled()).withTableMetaClient(this.metadataMetaClient).build();
        Long logScannerOpenMs = timer.endTimer();
        LOG.info("Opened {} metadata log files (dataset instant={}, metadata instant={}) in {} ms", new Object[]{sortedLogFilePaths.size(), this.getLatestDataInstantTime(), latestMetadataInstantTime, logScannerOpenMs});
        return Pair.of(logRecordScanner, logScannerOpenMs);
    }

    private boolean isFullScanAllowedForPartition(String partitionName) {
        switch (partitionName) {
            case "files": {
                return true;
            }
        }
        return false;
    }

    @Override
    public void close() {
        this.closePartitionReaders();
        this.partitionFileSliceMap.clear();
        if (this.metadataFileSystemView != null) {
            this.metadataFileSystemView.close();
            this.metadataFileSystemView = null;
        }
    }

    private synchronized void close(Pair<String, String> partitionFileSlicePair) {
        Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = this.partitionReaders.get().remove(partitionFileSlicePair);
        this.closeReader(readers);
    }

    private void closePartitionReaders() {
        for (Pair<String, String> partitionFileSlicePair : this.partitionReaders.get().keySet()) {
            this.close(partitionFileSlicePair);
        }
        this.partitionReaders.get().clear();
    }

    private void closeReader(Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers) {
        if (readers != null) {
            try {
                if (readers.getKey() != null) {
                    readers.getKey().close();
                }
                if (readers.getValue() != null) {
                    readers.getValue().close();
                }
            }
            catch (Exception e) {
                throw new HoodieException("Error closing resources during metadata table merge", e);
            }
        }
    }

    public boolean enabled() {
        return this.isMetadataTableInitialized;
    }

    public HoodieTableMetaClient getMetadataMetaClient() {
        return this.metadataMetaClient;
    }

    public HoodieTableFileSystemView getMetadataFileSystemView() {
        if (this.metadataFileSystemView == null) {
            this.metadataFileSystemView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(this.metadataMetaClient);
        }
        return this.metadataFileSystemView;
    }

    public Map<String, String> stats() {
        Set allMetadataPartitionPaths = Arrays.stream(MetadataPartitionType.getValidValues()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
        return this.metrics.map(m -> m.getStats(true, this.metadataMetaClient, (HoodieTableMetadata)this, (Set<String>)allMetadataPartitionPaths)).orElseGet(HashMap::new);
    }

    @Override
    public Option<String> getSyncedInstantTime() {
        Option<HoodieInstant> latestInstant;
        if (this.metadataMetaClient != null && (latestInstant = this.metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()).isPresent()) {
            return Option.of(latestInstant.get().requestedTime());
        }
        return Option.empty();
    }

    @Override
    public Option<String> getLatestCompactionTime() {
        Option<HoodieInstant> latestCompaction;
        if (this.metadataMetaClient != null && (latestCompaction = this.metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant()).isPresent()) {
            return Option.of(latestCompaction.get().requestedTime());
        }
        return Option.empty();
    }

    @Override
    public void reset() {
        this.initIfNeeded();
        this.dataMetaClient.reloadActiveTimeline();
        if (this.metadataMetaClient != null) {
            this.metadataMetaClient.reloadActiveTimeline();
            if (this.metadataFileSystemView != null) {
                this.metadataFileSystemView.close();
            }
            this.metadataFileSystemView = null;
        }
        this.closePartitionReaders();
        this.partitionFileSliceMap.clear();
    }

    @Override
    public int getNumFileGroupsForPartition(MetadataPartitionType partition) {
        this.partitionFileSliceMap.computeIfAbsent(partition.getPartitionPath(), k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(this.metadataMetaClient, this.getMetadataFileSystemView(), partition.getPartitionPath()));
        return this.partitionFileSliceMap.get(partition.getPartitionPath()).size();
    }

    @Override
    public Map<Pair<String, StoragePath>, List<StoragePathInfo>> listPartitions(List<Pair<String, StoragePath>> partitionPathList) throws IOException {
        Map absoluteToPairMap = partitionPathList.stream().collect(Collectors.toMap(pair -> ((StoragePath)pair.getRight()).toString(), Function.identity()));
        return this.getAllFilesInPartitions(partitionPathList.stream().map(pair -> ((StoragePath)pair.getRight()).toString()).collect(Collectors.toList())).entrySet().stream().collect(Collectors.toMap(entry -> (Pair)absoluteToPairMap.get(entry.getKey()), Map.Entry::getValue));
    }

    @Override
    public Map<String, Set<String>> getSecondaryIndexRecords(List<String> keys, String partitionName) {
        if (keys.isEmpty()) {
            return Collections.emptyMap();
        }
        return this.getRecordsByKeyPrefixes(keys, partitionName, false).map(record -> {
            if (!((HoodieMetadataPayload)record.getData()).isDeleted()) {
                String recordKey = SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
                String secondaryKey = SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(record.getRecordKey());
                return Pair.of(secondaryKey, recordKey);
            }
            return null;
        }).filter(Objects::nonNull).collectAsList().stream().collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())));
    }
}

