/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.rollback;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class ListingBasedRollbackHelper
implements Serializable {
    private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
    private final HoodieTableMetaClient metaClient;
    private final HoodieWriteConfig config;

    public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
        this.metaClient = metaClient;
        this.config = config;
    }

    public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
        Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = this.maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
        Map<String, List<Pair>> collect = partitionPathRollbackStatsPairs.entrySet().stream().map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
        return collect.values().stream().map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
    }

    public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
        Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = this.maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
        return new ArrayList<HoodieRollbackStat>(partitionPathRollbackStatsPairs.values());
    }

    Map<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests, boolean doDelete) {
        return context.mapToPair(rollbackRequests, rollbackRequest -> {
            switch (rollbackRequest.getType()) {
                case DELETE_DATA_FILES_ONLY: {
                    Map<FileStatus, Boolean> filesToDeletedStatus = this.deleteBaseFiles(this.metaClient, this.config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
                    return new ImmutablePair<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withDeletedFileResults(filesToDeletedStatus).build());
                }
                case DELETE_DATA_AND_LOG_FILES: {
                    Map<FileStatus, Boolean> filesToDeletedStatus = this.deleteBaseAndLogFiles(this.metaClient, this.config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
                    return new ImmutablePair<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withDeletedFileResults(filesToDeletedStatus).build());
                }
                case APPEND_ROLLBACK_BLOCK: {
                    Closeable writer = null;
                    try {
                        writer = HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.getPartitionPath(this.metaClient.getBasePath(), rollbackRequest.getPartitionPath())).withFileId(rollbackRequest.getFileId().get()).overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(this.metaClient.getFs()).withFileExtension(".log").build();
                        if (doDelete) {
                            Map<HoodieLogBlock.HeaderMetadataType, String> header = this.generateHeader(instantToRollback.getTimestamp());
                            writer.appendBlock(new HoodieCommandBlock(header));
                        }
                    }
                    catch (IOException | InterruptedException io) {
                        throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
                    }
                    finally {
                        try {
                            if (writer != null) {
                                writer.close();
                            }
                        }
                        catch (IOException io) {
                            throw new HoodieIOException("Error appending rollback block..", io);
                        }
                    }
                    Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(this.metaClient.getFs().getFileStatus(((HoodieLogFormat.Writer)Objects.requireNonNull(writer)).getLogFile().getPath()), 1L);
                    return new ImmutablePair<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
                }
            }
            throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
        }, 0);
    }

    private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, String commit, String partitionPath, boolean doDelete) throws IOException {
        FileStatus[] toBeDeleted;
        LOG.info((Object)("Cleaning path " + partitionPath));
        String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
        SerializablePathFilter filter = path -> {
            if (path.toString().endsWith(basefileExtension)) {
                String fileCommitTime = FSUtils.getCommitTime(path.getName());
                return commit.equals(fileCommitTime);
            }
            if (FSUtils.isLogFile(path)) {
                String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
                return commit.equals(fileCommitTime);
            }
            return false;
        };
        HashMap<FileStatus, Boolean> results = new HashMap<FileStatus, Boolean>();
        HoodieWrapperFileSystem fs = metaClient.getFs();
        for (FileStatus file : toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter)) {
            if (doDelete) {
                boolean success = fs.delete(file.getPath(), false);
                results.put(file, success);
                LOG.info((Object)("Delete file " + file.getPath() + "\t" + success));
                continue;
            }
            results.put(file, true);
        }
        return results;
    }

    private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, String commit, String partitionPath, boolean doDelete) throws IOException {
        FileStatus[] toBeDeleted;
        HashMap<FileStatus, Boolean> results = new HashMap<FileStatus, Boolean>();
        LOG.info((Object)("Cleaning path " + partitionPath));
        HoodieWrapperFileSystem fs = metaClient.getFs();
        String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
        PathFilter filter = path -> {
            if (path.toString().contains(basefileExtension)) {
                String fileCommitTime = FSUtils.getCommitTime(path.getName());
                return commit.equals(fileCommitTime);
            }
            return false;
        };
        for (FileStatus file : toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter)) {
            if (doDelete) {
                boolean success = fs.delete(file.getPath(), false);
                results.put(file, success);
                LOG.info((Object)("Delete file " + file.getPath() + "\t" + success));
                continue;
            }
            results.put(file, true);
        }
        return results;
    }

    private Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>(3);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, this.metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        return header;
    }

    public static interface SerializablePathFilter
    extends PathFilter,
    Serializable {
    }
}

