/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.HoodieMergedReadHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.HoodieDeleteHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieIndexUtils {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieIndexUtils.class);

    public static List<HoodieBaseFile> getLatestBaseFilesForPartition(String partition, HoodieTable hoodieTable) {
        Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant();
        if (latestCommitTime.isPresent()) {
            return hoodieTable.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn(partition, latestCommitTime.get().getTimestamp()).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    public static List<FileSlice> getLatestFileSlicesForPartition(String partition, HoodieTable hoodieTable) {
        Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant();
        if (latestCommitTime.isPresent()) {
            return hoodieTable.getHoodieView().getLatestFileSlicesBeforeOrOn(partition, latestCommitTime.get().getTimestamp(), true).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartitions(List<String> partitions, HoodieEngineContext context, HoodieTable hoodieTable) {
        context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions: " + hoodieTable.getConfig().getTableName());
        return context.flatMap(partitions, partitionPath -> {
            List filteredFiles = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable).stream().map(baseFile -> Pair.of(partitionPath, baseFile)).collect(Collectors.toList());
            return filteredFiles.stream();
        }, Math.max(partitions.size(), 1));
    }

    public static <R> HoodieRecord<R> tagAsNewRecordIfNeeded(HoodieRecord<R> record, Option<HoodieRecordLocation> location) {
        if (location.isPresent()) {
            HoodieRecord<R> newRecord = record.newInstance();
            newRecord.unseal();
            newRecord.setCurrentLocation(location.get());
            newRecord.seal();
            return newRecord;
        }
        return record;
    }

    public static <R> HoodieRecord<R> tagRecord(HoodieRecord<R> record, HoodieRecordLocation location) {
        record.unseal();
        record.setCurrentLocation(location);
        record.seal();
        return record;
    }

    public static List<String> filterKeysFromFile(Path filePath, List<String> candidateRecordKeys, Configuration configuration) throws HoodieIndexException {
        ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
        ArrayList<String> foundRecordKeys = new ArrayList<String>();
        try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration, filePath);){
            if (!candidateRecordKeys.isEmpty()) {
                HoodieTimer timer = HoodieTimer.start();
                Set<String> fileRowKeys = fileReader.filterRowKeys(new TreeSet<String>(candidateRecordKeys));
                foundRecordKeys.addAll(fileRowKeys);
                LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath, timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
                }
            }
        }
        catch (Exception e) {
            throw new HoodieIndexException("Error checking candidate keys against file.", e);
        }
        return foundRecordKeys;
    }

    public static boolean checkIfValidCommit(HoodieTimeline commitTimeline, String commitTs) {
        return !commitTimeline.empty() && commitTimeline.containsOrBeforeTimelineStarts(commitTs);
    }

    public static HoodieIndex createUserDefinedIndex(HoodieWriteConfig config) {
        Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config);
        if (!(instance instanceof HoodieIndex)) {
            throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex");
        }
        return (HoodieIndex)instance;
    }

    private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(HoodieData<HoodieRecordGlobalLocation> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable) {
        Option<String> instantTime = hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp);
        return partitionLocations.flatMap(p -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getPartitionPath(), p.getFileId())).getMergedRecords().iterator());
    }

    private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecord(HoodieRecord<R> incoming, HoodieRecord<R> existing, Schema writeSchema, HoodieWriteConfig config, HoodieRecordMerger recordMerger) throws IOException {
        Schema writeSchemaWithMetaFields;
        HoodieRecord incomingPrepended;
        HoodieRecord incomingWithMetaFields;
        Schema existingSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
        Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(existing, existingSchema, incomingWithMetaFields = (incomingPrepended = incoming.prependMetaFields(writeSchema, writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema, config.allowOperationMetadataField()), new MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()), config.getProps())).wrapIntoHoodieRecordPayloadWithParams(writeSchema, config.getProps(), Option.empty(), config.allowOperationMetadataField(), Option.empty(), false, Option.empty()), writeSchemaWithMetaFields, config.getProps());
        if (mergeResult.isPresent()) {
            HoodieRecord merged = mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields, config.getProps(), Option.empty(), config.allowOperationMetadataField(), Option.empty(), false, Option.of(writeSchema));
            return Option.of(merged);
        }
        return Option.empty();
    }

    public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesIfNeeded(HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations, HoodieWriteConfig config, HoodieTable hoodieTable) {
        HoodieData<HoodieRecord> taggedNewRecords = incomingRecordsAndLocations.filter(p -> !((Option)p.getRight()).isPresent()).map(Pair::getLeft);
        HoodieData<HoodieRecord> untaggedUpdatingRecords = incomingRecordsAndLocations.filter(p -> ((Option)p.getRight()).isPresent()).map(Pair::getLeft).distinctWithKey(HoodieRecord::getRecordKey, config.getGlobalIndexReconcileParallelism());
        HoodieData<HoodieRecordGlobalLocation> globalLocations = incomingRecordsAndLocations.filter(p -> ((Option)p.getRight()).isPresent()).map(p -> (HoodieRecordGlobalLocation)((Option)p.getRight()).get()).distinct(config.getGlobalIndexReconcileParallelism());
        HoodieData<HoodieRecord<R>> existingRecords = HoodieIndexUtils.getExistingRecords(globalLocations, config, hoodieTable);
        HoodieRecordMerger recordMerger = config.getRecordMerger();
        HoodieData taggedUpdatingRecords = untaggedUpdatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r)).leftOuterJoin(existingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))).values().flatMap(entry -> {
            HoodieRecord incoming = (HoodieRecord)entry.getLeft();
            Option existingOpt = (Option)entry.getRight();
            if (!existingOpt.isPresent()) {
                return Collections.singletonList(incoming).iterator();
            }
            HoodieRecord existing = (HoodieRecord)existingOpt.get();
            Schema writeSchema = new Schema.Parser().parse(config.getWriteSchema());
            if (incoming.isDelete(writeSchema, config.getProps())) {
                return Collections.singletonList(HoodieIndexUtils.tagRecord(incoming.newInstance(existing.getKey()), existing.getCurrentLocation())).iterator();
            }
            Option mergedOpt = HoodieIndexUtils.mergeIncomingWithExistingRecord(incoming, existing, writeSchema, config, recordMerger);
            if (!mergedOpt.isPresent()) {
                return Collections.singletonList(HoodieIndexUtils.tagRecord(incoming.newInstance(existing.getKey()), existing.getCurrentLocation())).iterator();
            }
            HoodieRecord merged = mergedOpt.get();
            if (Objects.equals(merged.getPartitionPath(), existing.getPartitionPath())) {
                return Collections.singletonList(HoodieIndexUtils.tagRecord(merged, existing.getCurrentLocation())).iterator();
            }
            HoodieRecord deleteRecord = HoodieDeleteHelper.createDeleteRecord(config, existing.getKey());
            return Arrays.asList(HoodieIndexUtils.tagRecord(deleteRecord, existing.getCurrentLocation()), merged).iterator();
        });
        return taggedUpdatingRecords.union(taggedNewRecords);
    }

    public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(HoodieData<HoodieRecord<R>> incomingRecords, HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations, boolean mayContainDuplicateLookup, boolean shouldUpdatePartitionPath, HoodieWriteConfig config, HoodieTable table) {
        HoodieRecordMerger merger = config.getRecordMerger();
        HoodiePairData keyAndIncomingRecords = incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), record));
        HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values().map(v -> {
            HoodieRecord incomingRecord = (HoodieRecord)v.getLeft();
            Option<Object> currentLocOpt = Option.ofNullable(((Option)v.getRight()).orElse(null));
            if (currentLocOpt.isPresent()) {
                boolean shouldDoMergedLookUpThenTag;
                HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
                boolean bl = shouldDoMergedLookUpThenTag = mayContainDuplicateLookup || !Objects.equals(incomingRecord.getPartitionPath(), currentLoc.getPartitionPath());
                if (shouldUpdatePartitionPath && shouldDoMergedLookUpThenTag) {
                    return Pair.of(incomingRecord, currentLocOpt);
                }
                return Pair.of(HoodieIndexUtils.createNewTaggedHoodieRecord(incomingRecord, currentLoc, merger.getRecordType()), Option.empty());
            }
            return Pair.of(incomingRecord, Option.empty());
        });
        return shouldUpdatePartitionPath ? HoodieIndexUtils.mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations, config, table) : incomingRecordsAndLocations.map(Pair::getLeft);
    }

    public static <R> HoodieRecord<R> createNewTaggedHoodieRecord(HoodieRecord<R> oldRecord, HoodieRecordGlobalLocation location, HoodieRecord.HoodieRecordType recordType) {
        switch (recordType) {
            case AVRO: {
                HoodieKey recordKey = new HoodieKey(oldRecord.getRecordKey(), location.getPartitionPath());
                return HoodieIndexUtils.tagRecord(new HoodieAvroRecord<HoodieRecordPayload>(recordKey, (HoodieRecordPayload)oldRecord.getData()), location);
            }
            case SPARK: {
                return HoodieIndexUtils.tagRecord(oldRecord.newInstance(), location);
            }
        }
        throw new HoodieIndexException("Unsupported record type: " + (Object)((Object)recordType));
    }
}

