/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.rollback;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class ListingBasedRollbackHelper
implements Serializable {
    private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
    private final HoodieTableMetaClient metaClient;
    private final HoodieWriteConfig config;

    public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
        this.metaClient = metaClient;
        this.config = config;
    }

    public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
        int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), this.config.getRollbackParallelism()), 1);
        context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
        JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD = this.maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, sparkPartitions, true);
        return partitionPathRollbackStatsPairRDD.reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect();
    }

    public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
        int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), this.config.getRollbackParallelism()), 1);
        context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
        JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD = this.maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, sparkPartitions, false);
        return partitionPathRollbackStatsPairRDD.map(Tuple2::_2).collect();
    }

    JavaPairRDD<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests, int sparkPartitions, boolean doDelete) {
        JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
        return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair((PairFunction & Serializable)rollbackRequest -> {
            switch (rollbackRequest.getType()) {
                case DELETE_DATA_FILES_ONLY: {
                    Map<FileStatus, Boolean> filesToDeletedStatus = this.deleteBaseFiles(this.metaClient, this.config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
                    return new Tuple2((Object)rollbackRequest.getPartitionPath(), (Object)HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withDeletedFileResults(filesToDeletedStatus).build());
                }
                case DELETE_DATA_AND_LOG_FILES: {
                    Map<FileStatus, Boolean> filesToDeletedStatus = this.deleteBaseAndLogFiles(this.metaClient, this.config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
                    return new Tuple2((Object)rollbackRequest.getPartitionPath(), (Object)HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withDeletedFileResults(filesToDeletedStatus).build());
                }
                case APPEND_ROLLBACK_BLOCK: {
                    String fileId = rollbackRequest.getFileId().get();
                    String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
                    Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(this.metaClient.getFs(), FSUtils.getPartitionPath(this.config.getBasePath(), rollbackRequest.getPartitionPath()), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant).collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
                    Closeable writer = null;
                    try {
                        writer = HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.getPartitionPath(this.metaClient.getBasePath(), rollbackRequest.getPartitionPath())).withFileId(fileId).overBaseCommit(latestBaseInstant).withFs(this.metaClient.getFs()).withFileExtension(".log").build();
                        if (doDelete) {
                            Map<HoodieLogBlock.HeaderMetadataType, String> header = this.generateHeader(instantToRollback.getTimestamp());
                            writer.appendBlock(new HoodieCommandBlock(header));
                        }
                    }
                    catch (IOException | InterruptedException io) {
                        throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
                    }
                    finally {
                        try {
                            if (writer != null) {
                                writer.close();
                            }
                        }
                        catch (IOException io) {
                            throw new HoodieIOException("Error appending rollback block..", io);
                        }
                    }
                    Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(this.metaClient.getFs().getFileStatus(((HoodieLogFormat.Writer)Objects.requireNonNull(writer)).getLogFile().getPath()), 1L);
                    return new Tuple2((Object)rollbackRequest.getPartitionPath(), (Object)HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withRollbackBlockAppendResults(filesToNumBlocksRollback).withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
                }
            }
            throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
        });
    }

    private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, String commit, String partitionPath, boolean doDelete) throws IOException {
        FileStatus[] toBeDeleted;
        LOG.info((Object)("Cleaning path " + partitionPath));
        String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
        SerializablePathFilter filter = path -> {
            if (path.toString().endsWith(basefileExtension)) {
                String fileCommitTime = FSUtils.getCommitTime(path.getName());
                return commit.equals(fileCommitTime);
            }
            if (FSUtils.isLogFile(path)) {
                String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
                return commit.equals(fileCommitTime);
            }
            return false;
        };
        HashMap<FileStatus, Boolean> results = new HashMap<FileStatus, Boolean>();
        HoodieWrapperFileSystem fs = metaClient.getFs();
        for (FileStatus file : toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter)) {
            if (doDelete) {
                boolean success = fs.delete(file.getPath(), false);
                results.put(file, success);
                LOG.info((Object)("Delete file " + file.getPath() + "\t" + success));
                continue;
            }
            results.put(file, true);
        }
        return results;
    }

    private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, String commit, String partitionPath, boolean doDelete) throws IOException {
        FileStatus[] toBeDeleted;
        HashMap<FileStatus, Boolean> results = new HashMap<FileStatus, Boolean>();
        LOG.info((Object)("Cleaning path " + partitionPath));
        HoodieWrapperFileSystem fs = metaClient.getFs();
        String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
        PathFilter filter = path -> {
            if (path.toString().contains(basefileExtension)) {
                String fileCommitTime = FSUtils.getCommitTime(path.getName());
                return commit.equals(fileCommitTime);
            }
            return false;
        };
        for (FileStatus file : toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter)) {
            if (doDelete) {
                boolean success = fs.delete(file.getPath(), false);
                results.put(file, success);
                LOG.info((Object)("Delete file " + file.getPath() + "\t" + success));
                continue;
            }
            results.put(file, true);
        }
        return results;
    }

    private Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>(3);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, this.metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        return header;
    }

    public static interface SerializablePathFilter
    extends PathFilter,
    Serializable {
    }
}

