/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwritePartitioner;
import org.apache.hudi.table.action.commit.SparkWriteHelper;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
    private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;

    public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
        this(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE);
    }

    public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD, WriteOperationType writeOperationType) {
        super(context, config, table, instantTime, writeOperationType);
        this.inputRecordsRDD = inputRecordsRDD;
    }

    @Override
    public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
        return SparkWriteHelper.newInstance().write(this.instantTime, this.inputRecordsRDD, this.context, this.table, this.config.shouldCombineBeforeInsert(), this.config.getInsertShuffleParallelism(), this, false);
    }

    @Override
    protected Partitioner getPartitioner(WorkloadProfile profile) {
        return new SparkInsertOverwritePartitioner(profile, this.context, this.table, this.config);
    }

    @Override
    protected String getCommitActionType() {
        return "replacecommit";
    }

    @Override
    protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
        return writeStatuses.map((Function & Serializable)status -> status.getStat().getPartitionPath()).distinct().mapToPair((PairFunction & Serializable)partitionPath -> new Tuple2(partitionPath, this.getAllExistingFileIds((String)partitionPath))).collectAsMap();
    }

    protected List<String> getAllExistingFileIds(String partitionPath) {
        return this.table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
    }
}

