/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.rollback;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.rollback.AbstractMarkerBasedRollbackStrategy;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload>
extends AbstractMarkerBasedRollbackStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
    public SparkMarkerBasedRollbackStrategy(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
        super(table, context, config, instantTime);
    }

    @Override
    public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
        JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(this.context);
        try {
            MarkerFiles markerFiles = new MarkerFiles(this.table, instantToRollback.getTimestamp());
            List<String> markerFilePaths = markerFiles.allMarkerFilePaths();
            int parallelism = Math.max(Math.min(markerFilePaths.size(), this.config.getRollbackParallelism()), 1);
            jsc.setJobGroup(this.getClass().getSimpleName(), "Rolling back using marker files");
            return jsc.parallelize(markerFilePaths, parallelism).map((Function & Serializable)markerFilePath -> {
                String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
                IOType type = IOType.valueOf(typeStr);
                switch (type) {
                    case MERGE: {
                        return this.undoMerge(MarkerFiles.stripMarkerSuffix(markerFilePath));
                    }
                    case APPEND: {
                        return this.undoAppend(MarkerFiles.stripMarkerSuffix(markerFilePath), instantToRollback);
                    }
                    case CREATE: {
                        return this.undoCreate(MarkerFiles.stripMarkerSuffix(markerFilePath));
                    }
                }
                throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
            }).mapToPair((PairFunction & Serializable)rollbackStat -> new Tuple2((Object)rollbackStat.getPartitionPath(), rollbackStat)).reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect();
        }
        catch (Exception e) {
            throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
        }
    }

    @Override
    protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
        return FSUtils.getAllLogFiles(this.table.getMetaClient().getFs(), FSUtils.getPartitionPath(this.config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime).collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
    }
}

