/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.simple;

import java.io.Serializable;
import java.util.List;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.io.HoodieKeyLocationFetchHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class SparkHoodieSimpleIndex<T extends HoodieRecordPayload>
extends SparkHoodieIndex<T> {
    public SparkHoodieSimpleIndex(HoodieWriteConfig config) {
        super(config);
    }

    @Override
    public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, HoodieEngineContext context, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
        return writeStatusRDD;
    }

    @Override
    public boolean rollbackCommit(String commitTime) {
        return true;
    }

    @Override
    public boolean isGlobal() {
        return false;
    }

    @Override
    public boolean canIndexLogFiles() {
        return false;
    }

    @Override
    public boolean isImplicitWithStorage() {
        return true;
    }

    @Override
    public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, HoodieEngineContext context, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
        return this.tagLocationInternal(recordRDD, context, hoodieTable);
    }

    protected JavaRDD<HoodieRecord<T>> tagLocationInternal(JavaRDD<HoodieRecord<T>> inputRecordRDD, HoodieEngineContext context, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
        if (this.config.getSimpleIndexUseCaching()) {
            inputRecordRDD.persist(SparkMemoryUtils.getSimpleIndexInputStorageLevel(this.config.getProps()));
        }
        JavaPairRDD keyedInputRecordRDD = inputRecordRDD.mapToPair((PairFunction & Serializable)record -> new Tuple2((Object)record.getKey(), record));
        JavaPairRDD<HoodieKey, HoodieRecordLocation> existingLocationsOnTable = this.fetchRecordLocationsForAffectedPartitions((JavaRDD<HoodieKey>)keyedInputRecordRDD.keys(), context, hoodieTable, this.config.getSimpleIndexParallelism());
        JavaRDD taggedRecordRDD = keyedInputRecordRDD.leftOuterJoin(existingLocationsOnTable).map((Function & Serializable)entry -> {
            HoodieRecord untaggedRecord = (HoodieRecord)((Tuple2)entry._2)._1;
            Option<Object> location = Option.ofNullable(((Optional)((Tuple2)entry._2)._2).orNull());
            return HoodieIndexUtils.getTaggedRecord(untaggedRecord, location);
        });
        if (this.config.getSimpleIndexUseCaching()) {
            inputRecordRDD.unpersist();
        }
        return taggedRecordRDD;
    }

    protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchRecordLocationsForAffectedPartitions(JavaRDD<HoodieKey> hoodieKeys, HoodieEngineContext context, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable, int parallelism) {
        List affectedPartitionPathList = hoodieKeys.map(HoodieKey::getPartitionPath).distinct().collect();
        List<Pair<String, HoodieBaseFile>> latestBaseFiles = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(affectedPartitionPathList, context, hoodieTable);
        return this.fetchRecordLocations(context, hoodieTable, parallelism, latestBaseFiles);
    }

    protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchRecordLocations(HoodieEngineContext context, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable, int parallelism, List<Pair<String, HoodieBaseFile>> baseFiles) {
        JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
        int fetchParallelism = Math.max(1, Math.max(baseFiles.size(), parallelism));
        return jsc.parallelize(baseFiles, fetchParallelism).flatMapToPair((PairFlatMapFunction & Serializable)partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(this.config, hoodieTable, (Pair<String, HoodieBaseFile>)partitionPathBaseFile).locations().map(x -> Tuple2.apply(((Pair)x).getLeft(), ((Pair)x).getRight())).iterator());
    }
}

