/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.cdc;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;

public class HoodieCDCExtractor {
    private final HoodieTableMetaClient metaClient;
    private final Path basePath;
    private final FileSystem fs;
    private final HoodieCDCSupplementalLoggingMode supplementalLoggingMode;
    private final InstantRange instantRange;
    private Map<HoodieInstant, HoodieCommitMetadata> commits;
    private HoodieTableFileSystemView fsView;

    public HoodieCDCExtractor(HoodieTableMetaClient metaClient, InstantRange range) {
        this.metaClient = metaClient;
        this.basePath = metaClient.getBasePathV2();
        this.fs = metaClient.getFs().getFileSystem();
        this.supplementalLoggingMode = metaClient.getTableConfig().cdcSupplementalLoggingMode();
        this.instantRange = range;
        this.init();
    }

    private void init() {
        this.initInstantAndCommitMetadata();
    }

    public Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> extractCDCFileSplits() {
        ValidationUtils.checkState(this.commits != null, "Empty commits");
        HashMap<HoodieFileGroupId, List<HoodieCDCFileSplit>> fgToCommitChanges = new HashMap<HoodieFileGroupId, List<HoodieCDCFileSplit>>();
        for (HoodieInstant instant : this.commits.keySet()) {
            HoodieCommitMetadata commitMetadata = this.commits.get(instant);
            Map<String, List<HoodieWriteStat>> ptToWriteStats = commitMetadata.getPartitionToWriteStats();
            for (String partition : ptToWriteStats.keySet()) {
                List<HoodieWriteStat> hoodieWriteStats = ptToWriteStats.get(partition);
                hoodieWriteStats.forEach(writeStat -> {
                    HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partition, writeStat.getFileId());
                    HoodieCDCFileSplit changeFile = this.parseWriteStat(fileGroupId, instant, (HoodieWriteStat)writeStat, commitMetadata.getOperationType());
                    fgToCommitChanges.computeIfAbsent(fileGroupId, k -> new ArrayList());
                    ((List)fgToCommitChanges.get(fileGroupId)).add(changeFile);
                });
            }
            if (!(commitMetadata instanceof HoodieReplaceCommitMetadata)) continue;
            HoodieReplaceCommitMetadata replaceCommitMetadata = (HoodieReplaceCommitMetadata)commitMetadata;
            Map<String, List<String>> ptToReplacedFileId = replaceCommitMetadata.getPartitionToReplaceFileIds();
            for (String partition : ptToReplacedFileId.keySet()) {
                List<String> fileIds = ptToReplacedFileId.get(partition);
                fileIds.forEach(fileId -> {
                    Option<FileSlice> latestFileSliceOpt = this.getOrCreateFsView().fetchLatestFileSlice(partition, (String)fileId);
                    if (latestFileSliceOpt.isPresent()) {
                        HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partition, (String)fileId);
                        HoodieCDCFileSplit changeFile = new HoodieCDCFileSplit(instant.getTimestamp(), HoodieCDCInferenceCase.REPLACE_COMMIT, new ArrayList<String>(), latestFileSliceOpt, Option.empty());
                        if (!fgToCommitChanges.containsKey(fileGroupId)) {
                            fgToCommitChanges.put(fileGroupId, new ArrayList());
                        }
                        ((List)fgToCommitChanges.get(fileGroupId)).add(changeFile);
                    }
                });
            }
        }
        return fgToCommitChanges;
    }

    private HoodieTableFileSystemView getOrCreateFsView() {
        if (this.fsView == null) {
            this.fsView = this.initFSView();
        }
        return this.fsView;
    }

    private HoodieTableFileSystemView initFSView() {
        HashSet<String> touchedPartitions = new HashSet<String>();
        for (Map.Entry<HoodieInstant, HoodieCommitMetadata> entry : this.commits.entrySet()) {
            HoodieCommitMetadata commitMetadata = entry.getValue();
            touchedPartitions.addAll(commitMetadata.getPartitionToWriteStats().keySet());
            if (!(commitMetadata instanceof HoodieReplaceCommitMetadata)) continue;
            touchedPartitions.addAll(((HoodieReplaceCommitMetadata)commitMetadata).getPartitionToReplaceFileIds().keySet());
        }
        try {
            ArrayList<FileStatus> touchedFiles = new ArrayList<FileStatus>();
            for (String touchedPartition : touchedPartitions) {
                Path partitionPath = FSUtils.getPartitionPath(this.basePath, touchedPartition);
                touchedFiles.addAll(Arrays.asList(this.fs.listStatus(partitionPath)));
            }
            return new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsTimeline().filterCompletedInstants(), touchedFiles.toArray(new FileStatus[0]));
        }
        catch (Exception e) {
            throw new HoodieException("Fail to init FileSystem View for CDC", e);
        }
    }

    private void initInstantAndCommitMetadata() {
        try {
            HashSet<String> requiredActions = new HashSet<String>(Arrays.asList("commit", "deltacommit", "replacecommit"));
            HoodieActiveTimeline activeTimeLine = this.metaClient.getActiveTimeline();
            this.commits = activeTimeLine.getInstantsAsStream().filter(instant -> instant.isCompleted() && this.instantRange.isInRange(instant.getTimestamp()) && requiredActions.contains(instant.getAction().toLowerCase(Locale.ROOT))).map(instant -> {
                HoodieCommitMetadata commitMetadata;
                try {
                    commitMetadata = TimelineUtils.getCommitMetadata(instant, activeTimeLine);
                }
                catch (IOException e) {
                    throw new HoodieIOException(e.getMessage());
                }
                return Pair.of(instant, commitMetadata);
            }).filter(pair -> WriteOperationType.isDataChange(((HoodieCommitMetadata)pair.getRight()).getOperationType())).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
        }
        catch (Exception e) {
            throw new HoodieIOException("Fail to get the commit metadata for CDC");
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private HoodieCDCFileSplit parseWriteStat(HoodieFileGroupId fileGroupId, HoodieInstant instant, HoodieWriteStat writeStat, WriteOperationType operation) {
        Path basePath = this.metaClient.getBasePathV2();
        FileSystem fs = this.metaClient.getFs().getFileSystem();
        String instantTs = instant.getTimestamp();
        if (CollectionUtils.isNullOrEmpty(writeStat.getCdcStats())) {
            String path = writeStat.getPath();
            if (FSUtils.isBaseFile(new Path(path))) {
                if (WriteOperationType.isDelete(operation) && writeStat.getNumWrites() == 0L && writeStat.getNumDeletes() != 0L) {
                    HoodieBaseFile beforeBaseFile = this.getOrCreateFsView().getBaseFileOn(fileGroupId.getPartitionPath(), writeStat.getPrevCommit(), fileGroupId.getFileId()).orElseThrow(() -> new HoodieIOException("Can not get the previous version of the base file"));
                    FileSlice beforeFileSlice = new FileSlice(fileGroupId, writeStat.getPrevCommit(), beforeBaseFile, Collections.emptyList());
                    return new HoodieCDCFileSplit(instantTs, HoodieCDCInferenceCase.BASE_FILE_DELETE, new ArrayList<String>(), Option.of(beforeFileSlice), Option.empty());
                }
                if (writeStat.getNumUpdateWrites() != 0L) throw new HoodieException("There should be a cdc log file.");
                if (writeStat.getNumDeletes() != 0L) throw new HoodieException("There should be a cdc log file.");
                if (writeStat.getNumWrites() != writeStat.getNumInserts()) throw new HoodieException("There should be a cdc log file.");
                return new HoodieCDCFileSplit(instantTs, HoodieCDCInferenceCase.BASE_FILE_INSERT, path);
            }
            Option<FileSlice> beforeFileSliceOpt = this.getDependentFileSliceForLogFile(fileGroupId, instant, path);
            return new HoodieCDCFileSplit(instantTs, HoodieCDCInferenceCase.LOG_FILE, path, beforeFileSliceOpt, Option.empty());
        }
        if (this.supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER) {
            return new HoodieCDCFileSplit(instantTs, HoodieCDCInferenceCase.AS_IS, writeStat.getCdcStats().keySet());
        }
        try {
            HoodieBaseFile beforeBaseFile = this.getOrCreateFsView().getBaseFileOn(fileGroupId.getPartitionPath(), writeStat.getPrevCommit(), fileGroupId.getFileId()).orElseThrow(() -> new HoodieIOException("Can not get the previous version of the base file"));
            FileSlice beforeFileSlice = null;
            FileSlice currentFileSlice = new FileSlice(fileGroupId, instant.getTimestamp(), new HoodieBaseFile(fs.getFileStatus(new Path(basePath, writeStat.getPath()))), new ArrayList<HoodieLogFile>());
            if (this.supplementalLoggingMode != HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY) return new HoodieCDCFileSplit(instantTs, HoodieCDCInferenceCase.AS_IS, writeStat.getCdcStats().keySet(), Option.ofNullable(beforeFileSlice), Option.ofNullable(currentFileSlice));
            beforeFileSlice = new FileSlice(fileGroupId, writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<HoodieLogFile>());
            return new HoodieCDCFileSplit(instantTs, HoodieCDCInferenceCase.AS_IS, writeStat.getCdcStats().keySet(), Option.ofNullable(beforeFileSlice), Option.ofNullable(currentFileSlice));
        }
        catch (Exception e) {
            throw new HoodieException("Fail to parse HoodieWriteStat", e);
        }
    }

    private Option<FileSlice> getDependentFileSliceForLogFile(HoodieFileGroupId fgId, HoodieInstant instant, String currentLogFile) {
        Path partitionPath = FSUtils.getPartitionPath(this.basePath, fgId.getPartitionPath());
        if (instant.getAction().equals("deltacommit")) {
            String currentLogFileName = new Path(currentLogFile).getName();
            Option<Pair<String, List<String>>> fileSliceOpt = HoodieCommitMetadata.getFileSliceForFileGroupFromDeltaCommit(this.metaClient.getActiveTimeline().getInstantDetails(instant).get(), fgId);
            if (fileSliceOpt.isPresent()) {
                Pair<String, List<String>> fileSlice = fileSliceOpt.get();
                try {
                    HoodieBaseFile baseFile = new HoodieBaseFile(this.fs.getFileStatus(new Path(partitionPath, fileSlice.getLeft())));
                    Path[] logFilePaths = (Path[])fileSlice.getRight().stream().filter(logFile -> !logFile.equals(currentLogFileName)).map(logFile -> new Path(partitionPath, logFile)).toArray(Path[]::new);
                    List<HoodieLogFile> logFiles = Arrays.stream(this.fs.listStatus(logFilePaths)).map(HoodieLogFile::new).collect(Collectors.toList());
                    return Option.of(new FileSlice(fgId, instant.getTimestamp(), baseFile, logFiles));
                }
                catch (Exception e) {
                    throw new HoodieException("Fail to get the dependent file slice for a log file", e);
                }
            }
            return Option.empty();
        }
        return Option.empty();
    }
}

