/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.metadata.BaseTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieBackedTableMetadata
extends BaseTableMetadata {
    private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class);
    private String metadataBasePath;
    private HoodieTableMetaClient metadataMetaClient;
    private HoodieTableConfig metadataTableConfig;
    private final boolean reuse;
    private Map<String, Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>> partitionReaders = new ConcurrentHashMap<String, Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>>();

    public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapDirectory) {
        this(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory, false);
    }

    public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapDirectory, boolean reuse) {
        super(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory);
        this.reuse = reuse;
        this.initIfNeeded();
    }

    private void initIfNeeded() {
        this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(this.dataBasePath);
        if (!this.enabled) {
            if (!HoodieTableMetadata.isMetadataTable(this.metadataBasePath)) {
                LOG.info((Object)"Metadata table is disabled.");
            }
        } else if (this.metadataMetaClient == null) {
            try {
                this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.hadoopConf.get()).setBasePath(this.metadataBasePath).build();
                this.metadataTableConfig = this.metadataMetaClient.getTableConfig();
            }
            catch (TableNotFoundException e) {
                LOG.warn((Object)("Metadata table was not found at path " + this.metadataBasePath));
                this.enabled = false;
                this.metadataMetaClient = null;
                this.metadataTableConfig = null;
            }
            catch (Exception e) {
                LOG.error((Object)("Failed to initialize metadata table at path " + this.metadataBasePath), (Throwable)e);
                this.enabled = false;
                this.metadataMetaClient = null;
                this.metadataTableConfig = null;
            }
        }
    }

    @Override
    protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key, String partitionName) {
        List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> recordsByKeys = this.getRecordsByKeys(Collections.singletonList(key), partitionName);
        return recordsByKeys.size() == 0 ? Option.empty() : recordsByKeys.get(0).getValue();
    }

    @Override
    protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys2, String partitionName) {
        Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = this.openReadersIfNeeded(keys2.get(0), partitionName);
        try {
            ArrayList<Long> timings = new ArrayList<Long>();
            HoodieFileReader baseFileReader = readers.getKey();
            HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
            if (baseFileReader == null && logRecordScanner == null) {
                List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> list = Collections.emptyList();
                return list;
            }
            Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = this.readLogRecords(logRecordScanner, keys2, timings);
            List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = this.readFromBaseAndMergeWithLogRecords(baseFileReader, keys2, logRecords, timings, partitionName);
            LOG.info((Object)String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", keys2.size(), timings));
            List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> list = result;
            return list;
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Error merging records from metadata table for  " + keys2.size() + " key : ", ioe);
        }
        finally {
            if (!this.reuse) {
                this.close(partitionName);
            }
        }
    }

    private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner, List<String> keys2, List<Long> timings) {
        HoodieTimer timer = new HoodieTimer().startTimer();
        HashMap<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<String, Option<HoodieRecord<HoodieMetadataPayload>>>();
        timer.startTimer();
        if (logRecordScanner != null) {
            if (this.metadataConfig.enableFullScan()) {
                for (String key : keys2) {
                    logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
                }
            } else {
                List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList = logRecordScanner.getRecordsByKeys(keys2);
                for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
                    logRecords.put(entry.getKey(), entry.getValue());
                }
            }
        } else {
            for (String key : keys2) {
                logRecords.put(key, Option.empty());
            }
        }
        timings.add(timer.endTimer());
        return logRecords;
    }

    private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader, List<String> keys2, Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords, List<Long> timings, String partitionName) throws IOException {
        ArrayList<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>();
        HoodieTimer timer = new HoodieTimer().startTimer();
        timer.startTimer();
        HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
        if (baseFileReader != null) {
            HoodieTimer readTimer = new HoodieTimer();
            for (String key : keys2) {
                readTimer.startTimer();
                Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
                if (baseRecord.isPresent()) {
                    hoodieRecord = this.getRecord(baseRecord, partitionName);
                    this.metrics.ifPresent(m -> m.updateMetrics("basefile_read", readTimer.endTimer()));
                    if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
                        HoodieMetadataPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecord.getData());
                        result.add(Pair.of(key, Option.of(new HoodieRecord<HoodieMetadataPayload>(hoodieRecord.getKey(), mergedPayload))));
                        continue;
                    }
                    result.add(Pair.of(key, Option.of(hoodieRecord)));
                    continue;
                }
                result.add(Pair.of(key, logRecords.get(key)));
            }
            timings.add(timer.endTimer());
        } else {
            timings.add(timer.endTimer());
            for (Map.Entry<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecords.entrySet()) {
                result.add(Pair.of(entry.getKey(), entry.getValue()));
            }
        }
        return result;
    }

    private HoodieRecord<HoodieMetadataPayload> getRecord(Option<GenericRecord> baseRecord, String partitionName) {
        ValidationUtils.checkState(baseRecord.isPresent());
        if (this.metadataTableConfig.populateMetaFields()) {
            return (HoodieRecord)SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), this.metadataTableConfig.getPayloadClass(), this.metadataTableConfig.getPreCombineField(), false);
        }
        return (HoodieRecord)SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), this.metadataTableConfig.getPayloadClass(), this.metadataTableConfig.getPreCombineField(), Pair.of(this.metadataTableConfig.getRecordKeyFieldProp(), this.metadataTableConfig.getPartitionFieldProp()), false, Option.of(partitionName));
    }

    private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String key, String partitionName) {
        return this.partitionReaders.computeIfAbsent(partitionName, k -> {
            try {
                HoodieFileReader baseFileReader = null;
                HoodieMetadataMergedLogRecordReader logRecordScanner = null;
                HoodieTimer timer = new HoodieTimer().startTimer();
                List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(this.metadataMetaClient, partitionName, true);
                if (latestFileSlices.size() == 0) {
                    return Pair.of(null, null);
                }
                ValidationUtils.checkArgument(latestFileSlices.size() == 1, String.format("Invalid number of file slices: found=%d, required=%d", latestFileSlices.size(), 1));
                FileSlice slice = latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, latestFileSlices.size()));
                Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = this.getBaseFileReader(slice, timer);
                baseFileReader = baseFileReaderOpenTimePair.getKey();
                long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
                Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair = this.getLogRecordScanner(slice, partitionName);
                logRecordScanner = logRecordScannerOpenTimePair.getKey();
                long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
                this.metrics.ifPresent(metrics -> metrics.updateMetrics("scan", baseFileOpenMs + logScannerOpenMs));
                return Pair.of(baseFileReader, logRecordScanner);
            }
            catch (IOException e) {
                throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
            }
        });
    }

    private Pair<HoodieFileReader, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
        Long baseFileOpenMs;
        HoodieFileReader baseFileReader = null;
        Option<HoodieBaseFile> basefile = slice.getBaseFile();
        if (basefile.isPresent()) {
            String basefilePath = basefile.get().getPath();
            baseFileReader = HoodieFileReaderFactory.getFileReader(this.hadoopConf.get(), new Path(basefilePath));
            baseFileOpenMs = timer.endTimer();
            LOG.info((Object)String.format("Opened metadata base file from %s at instant %s in %d ms", basefilePath, basefile.get().getCommitTime(), baseFileOpenMs));
        } else {
            baseFileOpenMs = 0L;
            timer.endTimer();
        }
        return Pair.of(baseFileReader, baseFileOpenMs);
    }

    private Set<String> getValidInstantTimestamps() {
        HoodieActiveTimeline datasetTimeline = this.dataMetaClient.getActiveTimeline();
        Set<String> validInstantTimestamps = datasetTimeline.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
        String earliestInstantTime = validInstantTimestamps.isEmpty() ? "00000000000000" : (String)Collections.min(validInstantTimestamps);
        datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, earliestInstantTime)).forEach(instant -> validInstantTimestamps.addAll(this.getRollbackedCommits((HoodieInstant)instant, datasetTimeline)));
        validInstantTimestamps.add("00000000000000");
        return validInstantTimestamps;
    }

    private Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(FileSlice slice, String partitionName) {
        HoodieTimer timer = new HoodieTimer().startTimer();
        List logFilePaths = slice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(o -> o.getPath().toString()).collect(Collectors.toList());
        Set<String> validInstantTimestamps = this.getValidInstantTimestamps();
        Option<HoodieInstant> latestMetadataInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
        String latestMetadataInstantTime = latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse("00000000000000");
        Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
        HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(this.metadataConfig.getProps()).build();
        HoodieMetadataMergedLogRecordReader logRecordScanner = ((HoodieMetadataMergedLogRecordReader.Builder)HoodieMetadataMergedLogRecordReader.newBuilder().withFileSystem(this.metadataMetaClient.getFs()).withBasePath(this.metadataBasePath).withLogFilePaths(logFilePaths)).withReaderSchema(schema).withLatestInstantTime(latestMetadataInstantTime).withMaxMemorySizeInBytes(0x40000000L).withBufferSize(0xA00000).withSpillableMapBasePath(this.spillableMapDirectory).withDiskMapType(commonConfig.getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled()).withLogBlockTimestamps(validInstantTimestamps).enableFullScan(this.metadataConfig.enableFullScan()).withPartition(partitionName).build();
        Long logScannerOpenMs = timer.endTimer();
        LOG.info((Object)String.format("Opened %d metadata log files (dataset instant=%s, metadata instant=%s) in %d ms", logFilePaths.size(), this.getLatestDataInstantTime(), latestMetadataInstantTime, logScannerOpenMs));
        return Pair.of(logRecordScanner, logScannerOpenMs);
    }

    private List<String> getRollbackedCommits(HoodieInstant instant, HoodieActiveTimeline timeline) {
        try {
            if (instant.getAction().equals("rollback")) {
                HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(instant).get());
                return rollbackMetadata.getCommitsRollback();
            }
            LinkedList<String> rollbackedCommits = new LinkedList<String>();
            if (instant.getAction().equals("restore")) {
                HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(timeline.getInstantDetails(instant).get());
                restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback())));
            }
            return rollbackedCommits;
        }
        catch (IOException e) {
            throw new HoodieMetadataException("Error retrieving rollback commits for instant " + instant, e);
        }
    }

    @Override
    public void close() {
        for (String partitionName : this.partitionReaders.keySet()) {
            this.close(partitionName);
        }
        this.partitionReaders.clear();
    }

    private synchronized void close(String partitionName) {
        Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = this.partitionReaders.remove(partitionName);
        if (readers != null) {
            try {
                if (readers.getKey() != null) {
                    readers.getKey().close();
                }
                if (readers.getValue() != null) {
                    readers.getValue().close();
                }
            }
            catch (Exception e) {
                throw new HoodieException("Error closing resources during metadata table merge", e);
            }
        }
    }

    public boolean enabled() {
        return this.enabled;
    }

    public SerializableConfiguration getHadoopConf() {
        return this.hadoopConf;
    }

    public HoodieTableMetaClient getMetadataMetaClient() {
        return this.metadataMetaClient;
    }

    public Map<String, String> stats() {
        return this.metrics.map(m -> m.getStats(true, this.metadataMetaClient, (HoodieTableMetadata)this)).orElse(new HashMap());
    }

    @Override
    public Option<String> getSyncedInstantTime() {
        Option<HoodieInstant> latestInstant;
        if (this.metadataMetaClient != null && (latestInstant = this.metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()).isPresent()) {
            return Option.of(latestInstant.get().getTimestamp());
        }
        return Option.empty();
    }

    @Override
    public Option<String> getLatestCompactionTime() {
        Option<HoodieInstant> latestCompaction;
        if (this.metadataMetaClient != null && (latestCompaction = this.metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant()).isPresent()) {
            return Option.of(latestCompaction.get().getTimestamp());
        }
        return Option.empty();
    }
}

