/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.FlinkLazyInsertIterable;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.FlinkMergeHelper;
import org.apache.hudi.table.action.commit.Partitioner;
import org.apache.hudi.table.action.commit.UpsertPartitioner;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import scala.Tuple2;

public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayload>
extends BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, HoodieWriteMetadata> {
    private static final Logger LOG = LogManager.getLogger(BaseFlinkCommitActionExecutor.class);

    public BaseFlinkCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType) {
        super(context, config, table, instantTime, operationType, Option.empty());
    }

    public BaseFlinkCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType, Option extraMetadata) {
        super(context, config, table, instantTime, operationType, extraMetadata);
    }

    public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
        HoodieWriteMetadata result = new HoodieWriteMetadata();
        WorkloadProfile profile = null;
        if (this.isWorkloadProfileNeeded()) {
            profile = new WorkloadProfile(this.buildProfile(inputRecords));
            LOG.info((Object)("Workload profile :" + profile));
            try {
                this.saveWorkloadProfileMetadataToInflight(profile, this.instantTime);
            }
            catch (Exception e) {
                HoodieTableMetaClient metaClient = this.table.getMetaClient();
                HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), this.instantTime);
                try {
                    if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
                        throw new HoodieCommitException("Failed to commit " + this.instantTime + " unable to save inflight metadata ", (Throwable)e);
                    }
                }
                catch (IOException ex) {
                    LOG.error((Object)"Check file exists failed");
                    throw new HoodieCommitException("Failed to commit " + this.instantTime + " unable to save inflight metadata ", (Throwable)ex);
                }
            }
        }
        Partitioner partitioner = this.getPartitioner(profile);
        Map<Integer, List<HoodieRecord<T>>> partitionedRecords = this.partition(inputRecords, partitioner);
        LinkedList<WriteStatus> writeStatuses = new LinkedList<WriteStatus>();
        partitionedRecords.forEach((partition, records) -> {
            if (WriteOperationType.isChangingRecords((WriteOperationType)this.operationType)) {
                this.handleUpsertPartition(this.instantTime, (Integer)partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
            } else {
                this.handleInsertPartition(this.instantTime, (Integer)partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
            }
        });
        this.updateIndex(writeStatuses, (HoodieWriteMetadata<List<WriteStatus>>)result);
        return result;
    }

    protected void updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) {
        Instant indexStartTime = Instant.now();
        List statuses = (List)this.table.getIndex().updateLocation(writeStatuses, this.context, this.table);
        result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
        result.setWriteStatuses((Object)statuses);
    }

    protected String getCommitActionType() {
        return this.table.getMetaClient().getCommitActionType();
    }

    private Partitioner getPartitioner(WorkloadProfile profile) {
        if (WriteOperationType.isChangingRecords((WriteOperationType)this.operationType)) {
            return this.getUpsertPartitioner(profile);
        }
        return this.getInsertPartitioner(profile);
    }

    private Map<Integer, List<HoodieRecord<T>>> partition(List<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
        Map<Integer, List<Tuple2>> partitionedMidRecords = dedupedRecords.stream().map(record -> new Tuple2((Object)new Tuple2((Object)record.getKey(), (Object)Option.ofNullable((Object)record.getCurrentLocation())), record)).collect(Collectors.groupingBy(x -> partitioner.getPartition(x._1)));
        LinkedHashMap results = new LinkedHashMap();
        partitionedMidRecords.forEach((key, value) -> results.put((Integer)key, value.stream().map(x -> (HoodieRecord)x._2).collect(Collectors.toList())));
        return results;
    }

    protected Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(List<HoodieRecord<T>> inputRecords) {
        HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<String, WorkloadStat>();
        WorkloadStat globalStat = new WorkloadStat();
        Map<Pair, Long> partitionLocationCounts = inputRecords.stream().map(record -> Pair.of((Object)Pair.of((Object)record.getPartitionPath(), (Object)Option.ofNullable((Object)record.getCurrentLocation())), (Object)record)).collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
        for (Map.Entry<Pair, Long> e : partitionLocationCounts.entrySet()) {
            String partitionPath = (String)e.getKey().getLeft();
            Long count = e.getValue();
            Option locOption = (Option)e.getKey().getRight();
            if (!partitionPathStatMap.containsKey(partitionPath)) {
                partitionPathStatMap.put(partitionPath, new WorkloadStat());
            }
            if (locOption.isPresent()) {
                ((WorkloadStat)partitionPathStatMap.get(partitionPath)).addUpdates((HoodieRecordLocation)locOption.get(), count.longValue());
                globalStat.addUpdates((HoodieRecordLocation)locOption.get(), count.longValue());
                continue;
            }
            ((WorkloadStat)partitionPathStatMap.get(partitionPath)).addInserts(count.longValue());
            globalStat.addInserts(count.longValue());
        }
        return Pair.of(partitionPathStatMap, (Object)globalStat);
    }

    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result) {
        this.commit(extraMetadata, result, ((List)result.getWriteStatuses()).stream().map(WriteStatus::getStat).collect(Collectors.toList()));
    }

    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
        String actionType = this.getCommitActionType();
        LOG.info((Object)("Committing " + this.instantTime + ", action Type " + actionType));
        result.setCommitted(true);
        result.setWriteStats(writeStats);
        this.finalizeWrite(this.instantTime, writeStats, result);
        try {
            LOG.info((Object)("Committing " + this.instantTime + ", action Type " + this.getCommitActionType()));
            HoodieActiveTimeline activeTimeline = this.table.getActiveTimeline();
            HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, (Map)result.getPartitionToReplaceFileIds(), extraMetadata, (WriteOperationType)this.operationType, (String)this.getSchemaToStoreInCommit(), (String)this.getCommitActionType());
            activeTimeline.saveAsComplete(new HoodieInstant(true, this.getCommitActionType(), this.instantTime), Option.of((Object)metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
            LOG.info((Object)("Committed " + this.instantTime));
            result.setCommitMetadata(Option.of((Object)metadata));
        }
        catch (IOException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + this.instantTime, (Throwable)e);
        }
    }

    protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
        return Collections.emptyMap();
    }

    protected boolean isWorkloadProfileNeeded() {
        return true;
    }

    protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        UpsertPartitioner upsertPartitioner = (UpsertPartitioner)partitioner;
        BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
        BucketType btype = binfo.bucketType;
        try {
            if (btype.equals((Object)BucketType.INSERT)) {
                return this.handleInsert(binfo.fileIdPrefix, recordItr);
            }
            if (btype.equals((Object)BucketType.UPDATE)) {
                return this.handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
            }
            throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
        }
        catch (Throwable t) {
            String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
            LOG.error((Object)msg, t);
            throw new HoodieUpsertException(msg, t);
        }
    }

    protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        return this.handleUpsertPartition(instantTime, partition, recordItr, partitioner);
    }

    public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) throws IOException {
        if (!recordItr.hasNext()) {
            LOG.info((Object)("Empty partition with fileId => " + fileId));
            return Collections.singletonList(Collections.EMPTY_LIST).iterator();
        }
        HoodieMergeHandle upsertHandle = this.getUpdateHandle(partitionPath, fileId, recordItr);
        return this.handleUpdateInternal(upsertHandle, fileId);
    }

    protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId) throws IOException {
        if (upsertHandle.getOldFilePath() == null) {
            throw new HoodieUpsertException("Error in finding the old file path at commit " + this.instantTime + " for fileId: " + fileId);
        }
        FlinkMergeHelper.newInstance().runMerge(this.table, upsertHandle);
        if (upsertHandle.getPartitionPath() == null) {
            LOG.info((Object)("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + upsertHandle.writeStatuses()));
        }
        return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
    }

    protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
        if (this.table.requireSortedRecords()) {
            return new HoodieSortedMergeHandle(this.config, this.instantTime, this.table, recordItr, partitionPath, fileId, this.taskContextSupplier);
        }
        return new HoodieMergeHandle(this.config, this.instantTime, this.table, recordItr, partitionPath, fileId, this.taskContextSupplier);
    }

    protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
        return new HoodieMergeHandle(this.config, this.instantTime, this.table, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, this.taskContextSupplier);
    }

    public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) throws Exception {
        if (!recordItr.hasNext()) {
            LOG.info((Object)"Empty partition");
            return Collections.singletonList(Collections.EMPTY_LIST).iterator();
        }
        return new FlinkLazyInsertIterable<T>(recordItr, true, this.config, this.instantTime, this.table, idPfx, this.taskContextSupplier, (WriteHandleFactory)new CreateHandleFactory());
    }

    public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
        if (profile == null) {
            throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
        }
        return new UpsertPartitioner(profile, this.context, this.table, this.config);
    }

    public Partitioner getInsertPartitioner(WorkloadProfile profile) {
        return this.getUpsertPartitioner(profile);
    }
}

