/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.bloom;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.FlatteningIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.bloom.HoodieBloomFilterProbingResult;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class HoodieMetadataBloomFilterProbingFunction
implements PairFlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>, HoodieFileGroupId, HoodieBloomFilterProbingResult> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieMetadataBloomFilterProbingFunction.class);
    private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256L;
    private final HoodieTable hoodieTable;
    private final Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast;

    public HoodieMetadataBloomFilterProbingFunction(Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast, HoodieTable hoodieTable) {
        this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast;
        this.hoodieTable = hoodieTable;
    }

    public Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> call(Iterator<Tuple2<HoodieFileGroupId, String>> tuple2Iterator) throws Exception {
        return new FlatteningIterator(new BloomIndexLazyKeyCheckIterator(tuple2Iterator));
    }

    private class BloomIndexLazyKeyCheckIterator
    extends LazyIterableIterator<Tuple2<HoodieFileGroupId, String>, Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>>> {
        public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId, String>> tuple2Iterator) {
            super(tuple2Iterator);
        }

        @Override
        protected Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> computeNext() {
            HashMap<Pair, List> fileToKeysMap = new HashMap<Pair, List>();
            HashMap<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<String, HoodieBaseFile>();
            while (this.inputItr.hasNext()) {
                Tuple2 entry2 = (Tuple2)this.inputItr.next();
                String partitionPath = ((HoodieFileGroupId)entry2._1).getPartitionPath();
                String fileId = ((HoodieFileGroupId)entry2._1).getFileId();
                if (!fileIDBaseFileMap.containsKey(fileId)) {
                    Option<HoodieBaseFile> baseFile = ((HoodieTableFileSystemView)HoodieMetadataBloomFilterProbingFunction.this.baseFileOnlyViewBroadcast.getValue()).getLatestBaseFile(partitionPath, fileId);
                    if (!baseFile.isPresent()) {
                        throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath + ", fileId: " + fileId);
                    }
                    fileIDBaseFileMap.put(fileId, baseFile.get());
                }
                fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId)), k -> new ArrayList()).add(new HoodieKey((String)entry2._2, partitionPath));
                if ((long)fileToKeysMap.size() <= 256L) continue;
                break;
            }
            if (fileToKeysMap.isEmpty()) {
                return Collections.emptyIterator();
            }
            List<Pair<String, String>> partitionNameFileNameList = fileToKeysMap.keySet().stream().map(pair -> Pair.of(pair.getLeft(), ((HoodieBaseFile)pair.getRight()).getFileName())).collect(Collectors.toList());
            Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap = HoodieMetadataBloomFilterProbingFunction.this.hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
            return fileToKeysMap.entrySet().stream().map(entry -> {
                List hoodieKeyList = (List)entry.getValue();
                String partitionPath = (String)((Pair)entry.getKey()).getLeft();
                HoodieBaseFile baseFile = (HoodieBaseFile)((Pair)entry.getKey()).getRight();
                String fileId = baseFile.getFileId();
                ValidationUtils.checkState(!fileId.isEmpty());
                Pair<String, String> partitionPathFileNamePair = Pair.of(partitionPath, baseFile.getFileName());
                if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) {
                    throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileNamePair);
                }
                BloomFilter fileBloomFilter = (BloomFilter)fileToBloomFilterMap.get(partitionPathFileNamePair);
                ArrayList<String> candidateRecordKeys = new ArrayList<String>();
                hoodieKeyList.forEach(hoodieKey -> {
                    if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) {
                        candidateRecordKeys.add(hoodieKey.getRecordKey());
                    }
                });
                LOG.debug(String.format("Total records (%d), bloom filter candidates (%d)", hoodieKeyList.size(), candidateRecordKeys.size()));
                return Tuple2.apply((Object)new HoodieFileGroupId(partitionPath, fileId), (Object)new HoodieBloomFilterProbingResult(candidateRecordKeys));
            }).iterator();
        }
    }
}

