/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.CommitMetadataResolverFactory;
import org.apache.hudi.client.HoodieColumnStatsIndexUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseCommitActionExecutor<T, I, K, O, R>
extends BaseActionExecutor<T, I, K, O, R> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseCommitActionExecutor.class);
    protected final Option<Map<String, String>> extraMetadata;
    protected final WriteOperationType operationType;
    protected final TaskContextSupplier taskContextSupplier;
    protected final Option<TransactionManager> txnManagerOption;
    protected final Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxn;
    protected final Set<String> pendingInflightAndRequestedInstants;

    public BaseCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime, WriteOperationType operationType, Option<Map<String, String>> extraMetadata) {
        super(context, config, table, instantTime);
        this.operationType = operationType;
        this.extraMetadata = extraMetadata;
        this.taskContextSupplier = context.getTaskContextSupplier();
        Option<Object> option = this.txnManagerOption = config.shouldAutoCommit() != false ? Option.of(new TransactionManager(config, table.getStorage())) : Option.empty();
        if (this.txnManagerOption.isPresent() && this.txnManagerOption.get().isLockRequired()) {
            this.lastCompletedTxn = TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
            this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
            this.pendingInflightAndRequestedInstants.remove(instantTime);
        } else {
            this.lastCompletedTxn = Option.empty();
            this.pendingInflightAndRequestedInstants = Collections.emptySet();
        }
        if (!table.getStorageLayout().writeOperationSupported(operationType)) {
            throw new UnsupportedOperationException("Executor " + this.getClass().getSimpleName() + " is not compatible with table layout " + table.getStorageLayout().getClass().getSimpleName());
        }
    }

    public abstract HoodieWriteMetadata<O> execute(I var1);

    public HoodieWriteMetadata<O> execute(I inputRecords, Option<HoodieTimer> sourceReadAndIndexTimer) {
        return this.execute(inputRecords);
    }

    void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime) throws HoodieCommitException {
        try {
            HoodieCommitMetadata metadata2 = new HoodieCommitMetadata();
            profile.getOutputPartitionPaths().forEach(path -> {
                WorkloadStat partitionStat = profile.getOutputWorkloadStat((String)path);
                HoodieWriteStat insertStat = new HoodieWriteStat();
                insertStat.setNumInserts(partitionStat.getNumInserts());
                insertStat.setFileId("");
                insertStat.setPrevCommit("null");
                metadata2.addWriteStat((String)path, insertStat);
                HashMap<String, Pair<String, Long>> updateLocationMap = partitionStat.getUpdateLocationToCount();
                HashMap<String, Pair<String, Long>> insertLocationMap = partitionStat.getInsertLocationToCount();
                Stream.concat(updateLocationMap.keySet().stream(), insertLocationMap.keySet().stream()).distinct().forEach(fileId -> {
                    HoodieWriteStat writeStat = new HoodieWriteStat();
                    writeStat.setFileId((String)fileId);
                    Pair updateLocation = (Pair)updateLocationMap.get(fileId);
                    Pair insertLocation = (Pair)insertLocationMap.get(fileId);
                    writeStat.setPrevCommit(updateLocation != null ? (String)updateLocation.getKey() : (String)insertLocation.getKey());
                    if (updateLocation != null) {
                        writeStat.setNumUpdateWrites((Long)updateLocation.getValue());
                    }
                    if (insertLocation != null) {
                        writeStat.setNumInserts((Long)insertLocation.getValue());
                    }
                    metadata2.addWriteStat((String)path, writeStat);
                });
            });
            metadata2.setOperationType(this.operationType);
            HoodieActiveTimeline activeTimeline = this.table.getActiveTimeline();
            String commitActionType = this.getCommitActionType();
            HoodieInstant requested = this.table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED, commitActionType, instantTime);
            activeTimeline.transitionRequestedToInflight(requested, Option.of(metadata2), this.config.shouldAllowMultiWriteOnSameInstant());
        }
        catch (HoodieIOException io) {
            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
        }
    }

    protected String getCommitActionType() {
        return this.table.getMetaClient().getCommitActionType();
    }

    protected void runPrecommitValidators(HoodieWriteMetadata<O> writeMetadata) {
        if (StringUtils.isNullOrEmpty(this.config.getPreCommitValidators())) {
            return;
        }
        throw new HoodieIOException("Precommit validation not implemented for all engines yet");
    }

    protected void commitOnAutoCommit(HoodieWriteMetadata result2) {
        this.runPrecommitValidators(result2);
        if (this.config.shouldAutoCommit().booleanValue()) {
            LOG.info("Auto commit enabled: Committing " + this.instantTime);
            this.autoCommit(result2);
        } else {
            LOG.info("Auto commit disabled for " + this.instantTime);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void autoCommit(HoodieWriteMetadata<O> result2) {
        InstantGenerator factory = this.table.getMetaClient().getInstantGenerator();
        Option<HoodieInstant> inflightInstant = Option.of(factory.createNewInstant(HoodieInstant.State.INFLIGHT, this.getCommitActionType(), this.instantTime));
        ValidationUtils.checkState(this.txnManagerOption.isPresent(), "The transaction manager has not been initialized");
        TransactionManager txnManager = this.txnManagerOption.get();
        txnManager.beginTransaction(inflightInstant, this.lastCompletedTxn.isPresent() ? Option.of(this.lastCompletedTxn.get().getLeft()) : Option.empty());
        try {
            this.setCommitMetadata(result2);
            TransactionUtils.resolveWriteConflictIfAny(this.table, txnManager.getCurrentTransactionOwner(), result2.getCommitMetadata(), this.config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants);
            this.commit(result2);
        }
        finally {
            txnManager.endTransaction(inflightInstant);
        }
    }

    protected abstract void setCommitMetadata(HoodieWriteMetadata<O> var1);

    protected abstract void commit(HoodieWriteMetadata<O> var1);

    protected void commit(HoodieWriteMetadata<O> result2, List<HoodieWriteStat> writeStats) {
        String actionType = this.getCommitActionType();
        LOG.info("Committing " + this.instantTime + ", action Type " + actionType + ", operation Type " + (Object)((Object)this.operationType));
        result2.setCommitted(true);
        result2.setWriteStats(writeStats);
        this.finalizeWrite(this.instantTime, writeStats, result2);
        try {
            HoodieActiveTimeline activeTimeline = this.table.getActiveTimeline();
            HoodieCommitMetadata metadata2 = CommitMetadataResolverFactory.get(this.table.getMetaClient().getTableConfig().getTableVersion(), this.config.getEngineType(), this.table.getMetaClient().getTableType(), this.getCommitActionType()).reconcileMetadataForMissingFiles(this.config, this.context, this.table, this.instantTime, result2.getCommitMetadata().get());
            this.writeTableMetadata(metadata2, actionType);
            metadata2.getExtraMetadata().entrySet().removeIf(entry -> entry.getValue() == null);
            activeTimeline.saveAsComplete(false, this.table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, actionType, this.instantTime), Option.of(metadata2));
            LOG.info("Committed " + this.instantTime);
            result2.setCommitMetadata(Option.of(metadata2));
            HoodieColumnStatsIndexUtils.updateColsToIndex(this.table, this.config, metadata2, actionType, (metaClient, columnsToIndex) -> {
                this.updateColumnsToIndexForColumnStats((HoodieTableMetaClient)metaClient, (List<String>)columnsToIndex);
                return null;
            });
        }
        catch (HoodieIOException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + this.instantTime, e);
        }
    }

    protected abstract void updateColumnsToIndexForColumnStats(HoodieTableMetaClient var1, List<String> var2);

    protected void finalizeWrite(String instantTime, List<HoodieWriteStat> stats, HoodieWriteMetadata<O> result2) {
        try {
            Instant start2 = Instant.now();
            this.table.finalizeWrite(this.context, instantTime, stats);
            result2.setFinalizeDuration(Duration.between(start2, Instant.now()));
        }
        catch (HoodieIOException ioe) {
            throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
        }
    }

    protected String getSchemaToStoreInCommit() {
        return this.config.getSchema();
    }

    protected abstract Iterator<List<WriteStatus>> handleInsert(String var1, Iterator<HoodieRecord<T>> var2) throws Exception;

    protected abstract Iterator<List<WriteStatus>> handleUpdate(String var1, String var2, Iterator<HoodieRecord<T>> var3) throws IOException;

    protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieClusteringPlan clusteringPlan) {
        Option<Schema> schema;
        this.context.setJobStatus(this.getClass().getSimpleName(), "Clustering records for " + this.config.getTableName());
        HoodieInstant instant = ClusteringUtils.getRequestedClusteringInstant(this.instantTime, this.table.getActiveTimeline(), this.table.getMetaClient().getInstantGenerator()).get();
        ClusteringUtils.transitionClusteringOrReplaceRequestedToInflight(instant, Option.empty(), this.table.getActiveTimeline());
        this.table.getMetaClient().reloadActiveTimeline();
        this.config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, Boolean.FALSE.toString());
        try {
            schema = new TableSchemaResolver(this.table.getMetaClient()).getTableAvroSchemaIfPresent(false);
        }
        catch (Exception ex) {
            throw new HoodieSchemaException(ex);
        }
        HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = ((ClusteringExecutionStrategy)ReflectionUtils.loadClass(this.config.getClusteringExecutionStrategyClass(), new Class[]{HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, new Object[]{this.table, this.context, this.config})).performClustering(clusteringPlan, schema.get(), this.instantTime);
        HoodieData writeStatusList = (HoodieData)writeMetadata.getWriteStatuses();
        HoodieData<WriteStatus> statuses = this.updateIndex(writeStatusList, writeMetadata);
        statuses.persist(this.config.getString(HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE), this.context, HoodieData.HoodieDataCacheKey.of(this.config.getBasePath(), this.instantTime));
        writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
        writeMetadata.setPartitionToReplaceFileIds(this.getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
        this.commitOnAutoCommit(writeMetadata);
        if (!writeMetadata.getCommitMetadata().isPresent()) {
            LOG.info("Found empty commit metadata for clustering with instant time " + this.instantTime);
            HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(), this.extraMetadata, this.operationType, schema.get().toString(), this.getCommitActionType());
            writeMetadata.setCommitMetadata(Option.of(commitMetadata));
        }
        return writeMetadata;
    }

    private HoodieData<WriteStatus> updateIndex(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result2) {
        Instant indexStartTime = Instant.now();
        HoodieData<WriteStatus> statuses = this.table.getIndex().updateLocation(writeStatuses, this.context, this.table, this.instantTime);
        result2.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
        result2.setWriteStatuses(statuses);
        return statuses;
    }

    private Map<String, List<String>> getPartitionToReplacedFileIds(HoodieClusteringPlan clusteringPlan, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
        Set newFilesWritten = writeMetadata.getWriteStats().get().stream().map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
        return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan).filter(fg -> "org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy".equals(this.config.getClusteringExecutionStrategyClass()) || !newFilesWritten.contains(fg)).collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath, Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList())));
    }

    private void validateWriteResult(HoodieClusteringPlan clusteringPlan, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
        if (writeMetadata.getWriteStatuses().isEmpty()) {
            throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + this.instantTime + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() + " write statuses");
        }
    }
}

