/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.clean;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.clean.BaseCleanActionExecutor;
import org.apache.hudi.table.action.clean.PartitionCleanStat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import scala.Tuple2;

public class FlinkCleanActionExecutor<T extends HoodieRecordPayload>
extends BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
    private static final Logger LOG = LogManager.getLogger(FlinkCleanActionExecutor.class);

    public FlinkCleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String instantTime) {
        super(context, config, table, instantTime);
    }

    @Override
    List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
        Iterator<Tuple2<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream().flatMap(x -> ((List)x.getValue()).stream().map(y -> new Tuple2(x.getKey(), (Object)new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).iterator();
        Stream<Tuple2> partitionCleanStats = FlinkCleanActionExecutor.deleteFilesFunc(filesToBeDeletedPerPartition, this.table).collect(Collectors.groupingBy(Pair::getLeft)).entrySet().stream().map(x -> new Tuple2(x.getKey(), (Object)((List)x.getValue()).stream().map(y -> (PartitionCleanStat)y.getRight()).reduce(PartitionCleanStat::merge).get()));
        Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
        return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
            PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) ? (PartitionCleanStat)partitionCleanStatsMap.get(partitionPath) : new PartitionCleanStat((String)partitionPath);
            HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
            return HoodieCleanStat.newBuilder().withPolicy(this.config.getCleanerPolicy()).withPartitionPath((String)partitionPath).withEarliestCommitRetained(Option.ofNullable(actionInstant != null ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), actionInstant.getAction(), actionInstant.getTimestamp()) : null)).withDeletePathPattern(partitionCleanStat.deletePathPatterns()).withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()).withFailedDeletes(partitionCleanStat.failedDeleteFiles()).withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()).withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()).withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()).build();
        }).collect(Collectors.toList());
    }

    private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<Tuple2<String, CleanFileInfo>> iter, HoodieTable table) {
        HashMap<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<String, PartitionCleanStat>();
        HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
        while (iter.hasNext()) {
            Tuple2<String, CleanFileInfo> partitionDelFileTuple = iter.next();
            String partitionPath = (String)partitionDelFileTuple._1();
            Path deletePath = new Path(((CleanFileInfo)partitionDelFileTuple._2()).getFilePath());
            String deletePathStr = deletePath.toString();
            Boolean deletedFileResult = null;
            try {
                deletedFileResult = FlinkCleanActionExecutor.deleteFileAndGetResult(fs, deletePathStr);
            }
            catch (IOException e2) {
                LOG.error((Object)"Delete file failed");
            }
            if (!partitionCleanStatMap.containsKey(partitionPath)) {
                partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
            }
            boolean isBootstrapBasePathFile = ((CleanFileInfo)partitionDelFileTuple._2()).isBootstrapBaseFile();
            PartitionCleanStat partitionCleanStat = (PartitionCleanStat)partitionCleanStatMap.get(partitionPath);
            if (isBootstrapBasePathFile) {
                partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
                partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
                continue;
            }
            partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
            partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
        }
        return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue()));
    }
}

