/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.clean;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.clean.BaseCleanActionExecutor;
import org.apache.hudi.table.action.clean.PartitionCleanStat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

public class SparkCleanActionExecutor<T extends HoodieRecordPayload>
extends BaseCleanActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
    private static final Logger LOG = LogManager.getLogger(SparkCleanActionExecutor.class);

    public SparkCleanActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, String instantTime) {
        super(context, config, table, instantTime);
    }

    private static PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, String, PartitionCleanStat> deleteFilesFunc(HoodieTable table) {
        return (PairFlatMapFunction & Serializable)iter -> {
            HashMap<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<String, PartitionCleanStat>();
            HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
            while (iter.hasNext()) {
                Tuple2 partitionDelFileTuple = (Tuple2)iter.next();
                String partitionPath = (String)partitionDelFileTuple._1();
                Path deletePath = new Path(((CleanFileInfo)partitionDelFileTuple._2()).getFilePath());
                String deletePathStr = deletePath.toString();
                Boolean deletedFileResult = SparkCleanActionExecutor.deleteFileAndGetResult(fs, deletePathStr);
                if (!partitionCleanStatMap.containsKey(partitionPath)) {
                    partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
                }
                boolean isBootstrapBasePathFile = ((CleanFileInfo)partitionDelFileTuple._2()).isBootstrapBaseFile();
                PartitionCleanStat partitionCleanStat = (PartitionCleanStat)partitionCleanStatMap.get(partitionPath);
                if (isBootstrapBasePathFile) {
                    partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
                    partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
                    continue;
                }
                partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
                partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
            }
            return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2(e.getKey(), e.getValue())).collect(Collectors.toList()).iterator();
        };
    }

    @Override
    List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
        JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
        int cleanerParallelism = Math.min((int)cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count(), this.config.getCleanerParallelism());
        LOG.info((Object)("Using cleanerParallelism: " + cleanerParallelism));
        context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions");
        List partitionCleanStats = jsc.parallelize(cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream().flatMap(x -> ((List)x.getValue()).stream().map(y -> new Tuple2(x.getKey(), (Object)new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).collect(Collectors.toList()), cleanerParallelism).mapPartitionsToPair(SparkCleanActionExecutor.deleteFilesFunc(this.table)).reduceByKey(PartitionCleanStat::merge).collect();
        Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
        return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
            PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) ? (PartitionCleanStat)partitionCleanStatsMap.get(partitionPath) : new PartitionCleanStat((String)partitionPath);
            HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
            return HoodieCleanStat.newBuilder().withPolicy(this.config.getCleanerPolicy()).withPartitionPath((String)partitionPath).withEarliestCommitRetained(Option.ofNullable(actionInstant != null ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), actionInstant.getAction(), actionInstant.getTimestamp()) : null)).withDeletePathPattern(partitionCleanStat.deletePathPatterns()).withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()).withFailedDeletes(partitionCleanStat.failedDeleteFiles()).withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()).withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()).withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()).build();
        }).collect(Collectors.toList());
    }
}

