/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.rollback;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.LogFileCreationCallback;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.StorageSchemes;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.RollbackHelper;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.rollback.SerializableHoodieRollbackRequest;
import org.apache.hudi.table.marker.AppendMarkerHandler;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.util.CommonClientUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RollbackHelperV1
extends RollbackHelper {
    private static final Logger LOG = LoggerFactory.getLogger(RollbackHelperV1.class);

    public RollbackHelperV1(HoodieTable table, HoodieWriteConfig config) {
        super(table, config);
    }

    public static List<Option<StoragePathInfo>> getPathInfoUnderPartition(HoodieStorage storage2, StoragePath partitionPathIncludeBasePath, Set<String> filesNamesUnderThisPartition, boolean ignoreMissingFiles) {
        String storageScheme = storage2.getScheme();
        boolean useListStatus = StorageSchemes.isListStatusFriendly(storageScheme);
        ArrayList<Option<StoragePathInfo>> result2 = new ArrayList<Option<StoragePathInfo>>(filesNamesUnderThisPartition.size());
        try {
            if (useListStatus) {
                List<StoragePathInfo> storagePathInfos = storage2.listDirectEntries(partitionPathIncludeBasePath, path -> filesNamesUnderThisPartition.contains(path.getName()));
                Map<String, StoragePathInfo> filenameToPathInfoMap = storagePathInfos.stream().collect(Collectors.toMap(storagePathInfo -> storagePathInfo.getPath().getName(), storagePathInfo -> storagePathInfo));
                for (String fileName : filesNamesUnderThisPartition) {
                    if (filenameToPathInfoMap.containsKey(fileName)) {
                        result2.add(Option.of(filenameToPathInfoMap.get(fileName)));
                        continue;
                    }
                    if (!ignoreMissingFiles) {
                        throw new FileNotFoundException("File not found: " + new StoragePath(partitionPathIncludeBasePath.toString(), fileName));
                    }
                    result2.add(Option.empty());
                }
            } else {
                for (String fileName : filesNamesUnderThisPartition) {
                    StoragePath fullPath = new StoragePath(partitionPathIncludeBasePath.toString(), fileName);
                    try {
                        StoragePathInfo storagePathInfo2 = storage2.getPathInfo(fullPath);
                        result2.add(Option.of(storagePathInfo2));
                    }
                    catch (FileNotFoundException fileNotFoundException) {
                        if (ignoreMissingFiles) {
                            result2.add(Option.empty());
                            continue;
                        }
                        throw new FileNotFoundException("File not found: " + fullPath.toString());
                    }
                }
            }
        }
        catch (IOException e) {
            throw new HoodieIOException("List files under " + partitionPathIncludeBasePath + " failed", e);
        }
        return result2;
    }

    @Override
    public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, List<HoodieRollbackRequest> rollbackRequests) {
        int parallelism = Math.max(Math.min(rollbackRequests.size(), this.config.getRollbackParallelism()), 1);
        context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions: " + this.config.getTableName());
        List<SerializableHoodieRollbackRequest> serializableRequests = rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
        AppendMarkerHandler markerHandler = WriteMarkersFactory.getAppendMarkerHandler(this.config.getMarkersType(), this.table, instantTime);
        HashSet<String> logPaths = new HashSet();
        try {
            logPaths = markerHandler.getAppendedLogPaths(context, this.config.getFinalizeWriteParallelism());
        }
        catch (FileNotFoundException fnf) {
            LOG.warn("Rollback never failed and hence no marker dir was found. Safely moving on");
        }
        catch (IOException e) {
            throw new HoodieRollbackException("Failed to list log file markers for previous attempt of rollback ", e);
        }
        List getRollbackStats = this.maybeDeleteAndCollectStats(context, instantTime, instantToRollback, serializableRequests, true, parallelism);
        List<HoodieRollbackStat> mergedRollbackStatByPartitionPath = context.reduceByKey(getRollbackStats, RollbackUtils::mergeRollbackStat, parallelism);
        return this.addLogFilesFromPreviousFailedRollbacksToStat(context, mergedRollbackStatByPartitionPath, logPaths);
    }

    List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, List<SerializableHoodieRollbackRequest> rollbackRequests, boolean doDelete, int numPartitions) {
        List<SerializableHoodieRollbackRequest> processedRollbackRequests = this.metaClient.getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT) ? rollbackRequests : RollbackUtils.groupSerializableRollbackRequestsBasedOnFileGroup(rollbackRequests);
        TaskContextSupplier taskContextSupplier = context.getTaskContextSupplier();
        return context.flatMap(processedRollbackRequests, rollbackRequest -> {
            List<String> filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
            if (!filesToBeDeleted.isEmpty()) {
                List<HoodieRollbackStat> rollbackStats = this.deleteFiles(this.metaClient, filesToBeDeleted, doDelete);
                ArrayList partitionToRollbackStats = new ArrayList();
                rollbackStats.forEach(entry -> partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry)));
                return partitionToRollbackStats.stream();
            }
            if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
                StoragePath filePath;
                Closeable writer = null;
                try {
                    String partitionPath = rollbackRequest.getPartitionPath();
                    String fileId = rollbackRequest.getFileId();
                    HoodieTableVersion tableVersion = this.metaClient.getTableConfig().getTableVersion();
                    WriteMarkers writeMarkers = WriteMarkersFactory.get(this.config.getMarkersType(), this.table, instantTime);
                    writer = HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.constructAbsolutePath(this.metaClient.getBasePath(), partitionPath)).withFileId(fileId).withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier)).withInstantTime(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? instantToRollback.requestedTime() : rollbackRequest.getLatestBaseInstant()).withStorage(this.metaClient.getStorage()).withFileCreationCallback(this.getRollbackLogMarkerCallback(writeMarkers, partitionPath, fileId)).withTableVersion(tableVersion).withFileExtension(".log").build();
                    if (doDelete) {
                        Map<HoodieLogBlock.HeaderMetadataType, String> header = this.generateHeader(instantToRollback.requestedTime());
                        filePath = writer.appendBlock(new HoodieCommandBlock(header)).logFile().getPath();
                    } else {
                        filePath = writer.getLogFile().getPath();
                    }
                }
                catch (IOException | InterruptedException io) {
                    throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
                }
                finally {
                    try {
                        if (writer != null) {
                            writer.close();
                        }
                    }
                    catch (IOException io) {
                        throw new HoodieIOException("Error appending rollback block", io);
                    }
                }
                Map<StoragePathInfo, Long> filesToNumBlocksRollback = Collections.singletonMap(this.metaClient.getStorage().getPathInfo(Objects.requireNonNull(filePath)), 1L);
                String partitionFullPath = FSUtils.constructAbsolutePath(this.metaClient.getBasePath().toString(), rollbackRequest.getPartitionPath()).toString();
                HashMap<String, Long> validLogBlocksToDelete = new HashMap<String, Long>();
                rollbackRequest.getLogBlocksToBeDeleted().entrySet().stream().forEach(kv -> {
                    String logFileFullPath = (String)kv.getKey();
                    String logFileName = logFileFullPath.replace(partitionFullPath, "");
                    if (!StringUtils.isNullOrEmpty(logFileName)) {
                        validLogBlocksToDelete.put((String)kv.getKey(), (Long)kv.getValue());
                    }
                });
                return Collections.singletonList(Pair.of(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withRollbackBlockAppendResults(filesToNumBlocksRollback).withLogFilesFromFailedCommit(validLogBlocksToDelete).build())).stream();
            }
            return Collections.singletonList(Pair.of(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).build())).stream();
        }, numPartitions);
    }

    private LogFileCreationCallback getRollbackLogMarkerCallback(final WriteMarkers writeMarkers, final String partitionPath, final String fileId) {
        return new LogFileCreationCallback(){

            @Override
            public boolean preFileCreation(HoodieLogFile logFileToCreate) {
                return this.createAppendMarker(logFileToCreate);
            }

            private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
                return writeMarkers.createIfNotExists(partitionPath, logFileToAppend.getFileName(), IOType.APPEND, RollbackHelperV1.this.config, fileId, RollbackHelperV1.this.metaClient.getActiveTimeline()).isPresent();
            }
        };
    }

    private List<HoodieRollbackStat> addLogFilesFromPreviousFailedRollbacksToStat(HoodieEngineContext context, List<HoodieRollbackStat> originalRollbackStats, Set<String> logPaths) {
        if (logPaths.isEmpty()) {
            return originalRollbackStats;
        }
        String basePathStr = this.metaClient.getBasePath().toString();
        ArrayList<String> logFiles = new ArrayList<String>(logPaths);
        HoodiePairData<String, List<String>> partitionPathToLogFilesHoodieData = this.populatePartitionToLogFilesHoodieData(context, basePathStr, logFiles);
        HoodiePairData<String, HoodieRollbackStat> partitionPathToRollbackStatsHoodieData = context.parallelize(originalRollbackStats).mapToPair(t -> Pair.of(t.getPartitionPath(), t));
        List<HoodieRollbackStat> finalRollbackStats = this.addMissingLogFilesAndGetRollbackStats(partitionPathToRollbackStatsHoodieData, partitionPathToLogFilesHoodieData, basePathStr, context.getStorageConf());
        return finalRollbackStats;
    }

    private HoodiePairData<String, List<String>> populatePartitionToLogFilesHoodieData(HoodieEngineContext context, String basePathStr, List<String> logFiles) {
        return context.parallelize(logFiles).mapToPair(t -> {
            StoragePath logFilePath = new StoragePath(basePathStr, (String)t);
            String partitionPath = FSUtils.getRelativePartitionPath(new StoragePath(basePathStr), logFilePath.getParent());
            return Pair.of(partitionPath, logFilePath.getName());
        }).groupByKey().mapToPair(t -> {
            ArrayList allFiles = new ArrayList();
            ((Iterable)t.getRight()).forEach(entry -> allFiles.add(entry));
            return Pair.of(t.getKey(), allFiles);
        });
    }

    private List<HoodieRollbackStat> addMissingLogFilesAndGetRollbackStats(HoodiePairData<String, HoodieRollbackStat> partitionPathToRollbackStatsHoodieData, HoodiePairData<String, List<String>> partitionPathToLogFilesHoodieData, String basePathStr, StorageConfiguration storageConfiguration) {
        return partitionPathToRollbackStatsHoodieData.leftOuterJoin(partitionPathToLogFilesHoodieData).map(v1 -> {
            if (((Option)((Pair)v1.getValue()).getValue()).isPresent()) {
                String partition = (String)v1.getKey();
                HoodieRollbackStat rollbackStat = (HoodieRollbackStat)((Pair)v1.getValue()).getKey();
                List missingLogFiles = (List)((Option)((Pair)v1.getValue()).getRight()).get();
                StoragePath fullPartitionPath = StringUtils.isNullOrEmpty(partition) ? new StoragePath(basePathStr) : new StoragePath(basePathStr, partition);
                HoodieStorage storage2 = HoodieStorageUtils.getStorage(storageConfiguration);
                List<Option<StoragePathInfo>> storagePathInfoOpts = RollbackHelperV1.getPathInfoUnderPartition(storage2, fullPartitionPath, new HashSet<String>(missingLogFiles), true);
                List<StoragePathInfo> storagePathInfos = storagePathInfoOpts.stream().filter(storagePathInfoOpt -> storagePathInfoOpt.isPresent()).map(Option::get).collect(Collectors.toList());
                HashMap<StoragePathInfo, Long> commandBlocksCount = new HashMap<StoragePathInfo, Long>(rollbackStat.getCommandBlocksCount());
                storagePathInfos.forEach(storagePathInfo -> commandBlocksCount.put((StoragePathInfo)storagePathInfo, storagePathInfo.getLength()));
                return new HoodieRollbackStat(rollbackStat.getPartitionPath(), rollbackStat.getSuccessDeleteFiles(), rollbackStat.getFailedDeleteFiles(), commandBlocksCount, rollbackStat.getLogFilesFromFailedCommit());
            }
            return (HoodieRollbackStat)((Pair)v1.getValue()).getKey();
        }).collectAsList();
    }
}

