/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.HoodieWriteHelper;
import org.apache.hudi.table.action.commit.SparkHoodiePartitioner;
import org.apache.hudi.table.action.commit.SparkInsertOverwritePartitioner;
import org.apache.spark.Partitioner;

public class SparkInsertOverwriteCommitActionExecutor<T>
extends BaseSparkCommitActionExecutor<T> {
    private final HoodieData<HoodieRecord<T>> inputRecordsRDD;

    public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
        this(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE);
    }

    public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD, WriteOperationType writeOperationType) {
        super(context, config, table, instantTime, writeOperationType);
        this.inputRecordsRDD = inputRecordsRDD;
    }

    public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
        return HoodieWriteHelper.newInstance().write(this.instantTime, this.inputRecordsRDD, this.context, this.table, this.config.shouldCombineBeforeInsert(), this.config.getInsertShuffleParallelism(), (BaseCommitActionExecutor)this, this.operationType);
    }

    @Override
    protected Partitioner getPartitioner(WorkloadProfile profile) {
        return (Partitioner)this.table.getStorageLayout().layoutPartitionerClass().map(c -> this.getLayoutPartitioner(profile, (String)c)).orElseGet(() -> new SparkInsertOverwritePartitioner(profile, this.context, this.table, this.config, this.operationType));
    }

    @Override
    protected String getCommitActionType() {
        return "replacecommit";
    }

    @Override
    protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
        String staticOverwritePartition = this.config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
        if (StringUtils.nonEmpty((String)staticOverwritePartition)) {
            List<String> partitionPaths = Arrays.asList(staticOverwritePartition.split(","));
            this.context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Getting ExistingFileIds of matching static partitions");
            return HoodieJavaPairRDD.getJavaPairRDD(this.context.parallelize(partitionPaths, partitionPaths.size()).mapToPair((SerializablePairFunction & Serializable)partitionPath -> Pair.of((Object)partitionPath, this.getAllExistingFileIds((String)partitionPath)))).collectAsMap();
        }
        return HoodieJavaPairRDD.getJavaPairRDD(((HoodieData)writeMetadata.getWriteStatuses()).map((SerializableFunction & Serializable)status -> status.getStat().getPartitionPath()).distinct().mapToPair((SerializablePairFunction & Serializable)partitionPath -> Pair.of((Object)partitionPath, this.getAllExistingFileIds((String)partitionPath)))).collectAsMap();
    }

    protected List<String> getAllExistingFileIds(String partitionPath) {
        return this.table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
    }

    @Override
    protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        SparkHoodiePartitioner upsertPartitioner = (SparkHoodiePartitioner)partitioner;
        BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
        BucketType btype = binfo.bucketType;
        switch (btype) {
            case INSERT: {
                return this.handleInsert(binfo.fileIdPrefix, recordItr);
            }
        }
        throw new AssertionError((Object)("Expect INSERT bucketType for insert overwrite, please correct the logical of " + partitioner.getClass().getName()));
    }
}

