/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.bloom;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.index.bloom.BloomIndexFileInfo;
import org.apache.hudi.index.bloom.BucketizedBloomCheckPartitioner;
import org.apache.hudi.index.bloom.HoodieBloomIndexCheckFunction;
import org.apache.hudi.index.bloom.IndexFileFilter;
import org.apache.hudi.index.bloom.IntervalTreeBasedIndexFileFilter;
import org.apache.hudi.index.bloom.ListBasedIndexFileFilter;
import org.apache.hudi.io.HoodieRangeInfoHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

public class SparkHoodieBloomIndex<T extends HoodieRecordPayload>
extends SparkHoodieIndex<T> {
    private static final Logger LOG = LogManager.getLogger(SparkHoodieBloomIndex.class);

    public SparkHoodieBloomIndex(HoodieWriteConfig config) {
        super(config);
    }

    @Override
    public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, HoodieEngineContext context, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
        if (this.config.getBloomIndexUseCaching()) {
            recordRDD.persist(SparkMemoryUtils.getBloomIndexInputStorageLevel(this.config.getProps()));
        }
        JavaPairRDD partitionRecordKeyPairRDD = recordRDD.mapToPair((PairFunction & Serializable)record -> new Tuple2((Object)record.getPartitionPath(), (Object)record.getRecordKey()));
        JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD = this.lookupIndex((JavaPairRDD<String, String>)partitionRecordKeyPairRDD, context, hoodieTable);
        if (this.config.getBloomIndexUseCaching()) {
            keyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
        }
        if (LOG.isDebugEnabled()) {
            long totalTaggedRecords = keyFilenamePairRDD.count();
            LOG.debug((Object)("Number of update records (ones tagged with a fileID): " + totalTaggedRecords));
        }
        JavaRDD<HoodieRecord<T>> taggedRecordRDD = this.tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD);
        if (this.config.getBloomIndexUseCaching()) {
            recordRDD.unpersist();
            keyFilenamePairRDD.unpersist();
        }
        return taggedRecordRDD;
    }

    private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(JavaPairRDD<String, String> partitionRecordKeyPairRDD, HoodieEngineContext context, HoodieTable hoodieTable) {
        Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
        ArrayList<String> affectedPartitionPathList = new ArrayList<String>(recordsPerPartition.keySet());
        List<Tuple2<String, BloomIndexFileInfo>> fileInfoList = this.loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
        Map<String, List<BloomIndexFileInfo>> partitionToFileInfo = fileInfoList.stream().collect(Collectors.groupingBy(Tuple2::_1, Collectors.mapping(Tuple2::_2, Collectors.toList())));
        JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD = this.explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD);
        Map<String, Long> comparisonsPerFileGroup = this.computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, context);
        int inputParallelism = partitionRecordKeyPairRDD.partitions().size();
        int joinParallelism = Math.max(inputParallelism, this.config.getBloomIndexParallelism());
        LOG.info((Object)("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${" + this.config.getBloomIndexParallelism() + "}"));
        return this.findMatchingFilesForRecordKeys(fileComparisonsRDD, joinParallelism, hoodieTable, comparisonsPerFileGroup);
    }

    private Map<String, Long> computeComparisonsPerFileGroup(Map<String, Long> recordsPerPartition, Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD, HoodieEngineContext context) {
        Map<String, Long> fileToComparisons;
        if (this.config.getBloomIndexPruneByRanges()) {
            context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files");
            fileToComparisons = fileComparisonsRDD.mapToPair((PairFunction & Serializable)t -> t).countByKey();
        } else {
            fileToComparisons = new HashMap<String, Long>();
            partitionToFileInfo.forEach((key, value) -> {
                for (BloomIndexFileInfo fileInfo : value) {
                    fileToComparisons.put(fileInfo.getFileId(), (Long)recordsPerPartition.get(key));
                }
            });
        }
        return fileToComparisons;
    }

    List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, HoodieEngineContext context, HoodieTable hoodieTable) {
        List partitionPathFileIDList = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream().map(pair -> Pair.of(pair.getKey(), ((HoodieBaseFile)pair.getValue()).getFileId())).collect(Collectors.toList());
        if (this.config.getBloomIndexPruneByRanges()) {
            context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
            return context.map(partitionPathFileIDList, pf -> {
                try {
                    HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(this.config, hoodieTable, (Pair<String, String>)pf);
                    String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
                    return new Tuple2(pf.getKey(), (Object)new BloomIndexFileInfo((String)pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
                }
                catch (MetadataNotFoundException me) {
                    LOG.warn((Object)("Unable to find range metadata in file :" + pf));
                    return new Tuple2(pf.getKey(), (Object)new BloomIndexFileInfo((String)pf.getValue()));
                }
            }, Math.max(partitionPathFileIDList.size(), 1));
        }
        return partitionPathFileIDList.stream().map(pf -> new Tuple2(pf.getKey(), (Object)new BloomIndexFileInfo((String)pf.getValue()))).collect(Collectors.toList());
    }

    @Override
    public boolean rollbackCommit(String instantTime) {
        return true;
    }

    @Override
    public boolean isGlobal() {
        return false;
    }

    @Override
    public boolean canIndexLogFiles() {
        return false;
    }

    @Override
    public boolean isImplicitWithStorage() {
        return true;
    }

    JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo, JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
        IndexFileFilter indexFileFilter = this.config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo) : new ListBasedIndexFileFilter(partitionToFileIndexInfo);
        return partitionRecordKeyPairRDD.map((Function & Serializable)partitionRecordKeyPair -> {
            String recordKey = (String)partitionRecordKeyPair._2();
            String partitionPath = (String)partitionRecordKeyPair._1();
            return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream().map(partitionFileIdPair -> new Tuple2(partitionFileIdPair.getRight(), (Object)new HoodieKey(recordKey, partitionPath))).collect(Collectors.toList());
        }).flatMap(List::iterator);
    }

    JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD, int shuffleParallelism, HoodieTable hoodieTable, Map<String, Long> fileGroupToComparisons) {
        if (this.config.useBloomIndexBucketizedChecking()) {
            BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons, this.config.getBloomIndexKeysPerBucket());
            fileComparisonsRDD = fileComparisonsRDD.mapToPair((PairFunction & Serializable)t -> new Tuple2(Pair.of(t._1, ((HoodieKey)t._2).getRecordKey()), t)).repartitionAndSortWithinPartitions((Partitioner)partitioner).map(Tuple2::_2);
        } else {
            fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
        }
        return fileComparisonsRDD.mapPartitionsWithIndex((Function2)new HoodieBloomIndexCheckFunction(hoodieTable, this.config), true).flatMap(List::iterator).filter((Function & Serializable)lr -> lr.getMatchingRecordKeys().size() > 0).flatMapToPair((PairFlatMapFunction & Serializable)lookupResult -> lookupResult.getMatchingRecordKeys().stream().map(recordKey -> new Tuple2((Object)new HoodieKey((String)recordKey, lookupResult.getPartitionPath()), (Object)new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()))).collect(Collectors.toList()).iterator());
    }

    protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
        JavaPairRDD keyRecordPairRDD = recordRDD.mapToPair((PairFunction & Serializable)record -> new Tuple2((Object)record.getKey(), record));
        return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values().map((Function & Serializable)v1 -> HoodieIndexUtils.getTaggedRecord((HoodieRecord)v1._1, Option.ofNullable(((Optional)v1._2).orNull())));
    }

    @Override
    public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, HoodieEngineContext context, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
        return writeStatusRDD;
    }
}

