/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.bloom;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieKeyLookupResult;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

public class HoodieMetadataBloomIndexCheckFunction
implements Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
    private static final Logger LOG = LogManager.getLogger(HoodieMetadataBloomIndexCheckFunction.class);
    private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256L;
    private final HoodieTable hoodieTable;

    public HoodieMetadataBloomIndexCheckFunction(HoodieTable hoodieTable) {
        this.hoodieTable = hoodieTable;
    }

    public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
        return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
    }

    private class BloomIndexLazyKeyCheckIterator
    extends LazyIterableIterator<Tuple2<String, HoodieKey>, List<HoodieKeyLookupResult>> {
        public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) {
            super(tuple2Iterator);
        }

        @Override
        protected void start() {
        }

        @Override
        protected List<HoodieKeyLookupResult> computeNext() {
            HashMap<Pair, List> fileToKeysMap = new HashMap<Pair, List>();
            HashMap<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<String, HoodieBaseFile>();
            ArrayList<HoodieKeyLookupResult> resultList = new ArrayList<HoodieKeyLookupResult>();
            while (this.inputItr.hasNext()) {
                Tuple2 entry = (Tuple2)this.inputItr.next();
                String partitionPath = ((HoodieKey)entry._2).getPartitionPath();
                String fileId = (String)entry._1;
                if (!fileIDBaseFileMap.containsKey(fileId)) {
                    Option<HoodieBaseFile> baseFile = HoodieMetadataBloomIndexCheckFunction.this.hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
                    if (!baseFile.isPresent()) {
                        throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath + ", fileId: " + fileId);
                    }
                    fileIDBaseFileMap.put(fileId, baseFile.get());
                }
                fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, ((HoodieBaseFile)fileIDBaseFileMap.get(fileId)).getFileName()), k -> new ArrayList()).add(entry._2);
                if ((long)fileToKeysMap.size() <= 256L) continue;
                break;
            }
            if (fileToKeysMap.isEmpty()) {
                return Collections.emptyList();
            }
            ArrayList<Pair<String, String>> partitionNameFileNameList = new ArrayList<Pair<String, String>>(fileToKeysMap.keySet());
            Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap = HoodieMetadataBloomIndexCheckFunction.this.hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
            AtomicInteger totalKeys = new AtomicInteger(0);
            fileToKeysMap.forEach((? super K partitionPathFileNamePair, ? super V hoodieKeyList) -> {
                String partitionPath = (String)partitionPathFileNamePair.getLeft();
                String fileName = (String)partitionPathFileNamePair.getRight();
                String fileId = FSUtils.getFileId(fileName);
                ValidationUtils.checkState(!fileId.isEmpty());
                if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) {
                    throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileNamePair);
                }
                BloomFilter fileBloomFilter = (BloomFilter)fileToBloomFilterMap.get(partitionPathFileNamePair);
                ArrayList<String> candidateRecordKeys = new ArrayList<String>();
                hoodieKeyList.forEach(hoodieKey -> {
                    totalKeys.incrementAndGet();
                    if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) {
                        candidateRecordKeys.add(hoodieKey.getRecordKey());
                    }
                });
                HoodieBaseFile dataFile = (HoodieBaseFile)fileIDBaseFileMap.get(fileId);
                List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()), candidateRecordKeys, HoodieMetadataBloomIndexCheckFunction.this.hoodieTable.getHadoopConf());
                LOG.debug((Object)String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", hoodieKeyList.size(), candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
                resultList.add(new HoodieKeyLookupResult(fileId, partitionPath, dataFile.getCommitTime(), matchingKeys));
            });
            return resultList;
        }

        @Override
        protected void end() {
        }
    }
}

