/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import com.codahale.metrics.Timer;
import java.io.IOException;
import java.text.ParseException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.client.AsyncCleanerService;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.FlinkConcatAndReplaceHandle;
import org.apache.hudi.io.FlinkConcatHandle;
import org.apache.hudi.io.FlinkCreateHandle;
import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.BaseUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieFlinkWriteClient<T extends HoodieRecordPayload>
extends AbstractHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkWriteClient.class);
    private final Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;
    private Option<HoodieBackedTableMetadataWriter> metadataWriterOption = Option.empty();

    public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
        super(context, writeConfig);
        this.bucketToHandles = new HashMap();
    }

    protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
        return FlinkHoodieIndexFactory.createIndex((HoodieFlinkEngineContext)this.context, this.config);
    }

    public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
        List writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
        return this.commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
    }

    protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline) {
        return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext)this.context);
    }

    public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        Timer.Context indexTimer = this.metrics.getIndexCtx();
        List recordsWithLocation = HoodieList.getList((HoodieData)this.getIndex().tagLocation((HoodieData)HoodieList.of(hoodieRecords), this.context, table));
        this.metrics.updateIndexMetrics("lookup", this.metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
        return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
    }

    public void bootstrap(Option<Map<String, String>> extraMetadata) {
        throw new HoodieNotSupportedException("Bootstrap operation is not supported yet");
    }

    public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table = this.getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
        HoodieWriteHandle<?, ?, ?, ?> writeHandle = this.getOrCreateWriteHandle(records.get(0), this.getConfig(), instantTime, table, records.listIterator());
        HoodieWriteMetadata result = ((HoodieFlinkTable)table).upsert(this.context, writeHandle, instantTime, records);
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", ((Duration)result.getIndexLookupDuration().get()).toMillis());
        }
        return this.postWrite(result, instantTime, (HoodieTable)table);
    }

    public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table = this.getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
        HoodieWriteHandle<?, ?, ?, ?> writeHandle = this.getOrCreateWriteHandle(preppedRecords.get(0), this.getConfig(), instantTime, table, preppedRecords.listIterator());
        HoodieWriteMetadata result = ((HoodieFlinkTable)table).upsertPrepped(this.context, writeHandle, instantTime, preppedRecords);
        return this.postWrite(result, instantTime, (HoodieTable)table);
    }

    public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table = this.getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
        HoodieWriteHandle<?, ?, ?, ?> writeHandle = this.getOrCreateWriteHandle(records.get(0), this.getConfig(), instantTime, table, records.listIterator());
        HoodieWriteMetadata result = ((HoodieFlinkTable)table).insert(this.context, writeHandle, instantTime, records);
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", ((Duration)result.getIndexLookupDuration().get()).toMillis());
        }
        return this.postWrite(result, instantTime, (HoodieTable)table);
    }

    public List<WriteStatus> insertOverwrite(List<HoodieRecord<T>> records, String instantTime) {
        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table = this.getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
        HoodieWriteHandle<?, ?, ?, ?> writeHandle = this.getOrCreateWriteHandle(records.get(0), this.getConfig(), instantTime, table, records.listIterator());
        HoodieWriteMetadata result = ((HoodieFlinkTable)table).insertOverwrite(this.context, writeHandle, instantTime, records);
        return this.postWrite(result, instantTime, (HoodieTable)table);
    }

    public List<WriteStatus> insertOverwriteTable(List<HoodieRecord<T>> records, String instantTime) {
        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table = this.getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
        HoodieWriteHandle<?, ?, ?, ?> writeHandle = this.getOrCreateWriteHandle(records.get(0), this.getConfig(), instantTime, table, records.listIterator());
        HoodieWriteMetadata result = ((HoodieFlinkTable)table).insertOverwriteTable(this.context, writeHandle, instantTime, records);
        return this.postWrite(result, instantTime, (HoodieTable)table);
    }

    public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        throw new HoodieNotSupportedException("InsertPrepped operation is not supported yet");
    }

    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
        throw new HoodieNotSupportedException("BulkInsertPrepped operation is not supported yet");
    }

    public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table = this.getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
        this.preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
        HoodieWriteMetadata result = table.delete(this.context, instantTime, keys);
        return this.postWrite(result, instantTime, (HoodieTable)table);
    }

    protected void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
        this.setOperationType(writeOperationType);
    }

    protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
        this.metadataWriterOption.ifPresent(w -> {
            w.initTableMetadata();
            w.update(metadata, instantTime, this.getHoodieTable().isTableServiceAction(actionType));
        });
    }

    public void initMetadataWriter() {
        HoodieBackedTableMetadataWriter metadataWriter = (HoodieBackedTableMetadataWriter)FlinkHoodieBackedTableMetadataWriter.create(FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
        this.metadataWriterOption = Option.of((Object)metadataWriter);
    }

    public void startAsyncCleaning() {
        this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled((AbstractHoodieWriteClient)this);
    }

    public void waitForCleaningFinish() {
        if (this.asyncCleanerService != null) {
            LOG.info("Cleaner has been spawned already. Waiting for it to finish");
            AsyncCleanerService.waitForCompletion((AsyncCleanerService)this.asyncCleanerService);
            LOG.info("Cleaner has finished");
        }
    }

    protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result, String instantTime, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics(this.getOperationType().name(), ((Duration)result.getIndexUpdateDuration().get()).toMillis());
        }
        return (List)result.getWriteStatuses();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void postCommit(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
        try {
            WriteMarkersFactory.get((MarkerType)this.config.getMarkersType(), (HoodieTable)this.createTable(this.config, this.hadoopConf), (String)instantTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
            if (this.config.isAutoArchive()) {
                this.archive(table);
            }
        }
        finally {
            this.heartbeatClient.stop(instantTime);
        }
    }

    public void commitCompaction(String compactionInstantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata) throws IOException {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(table, compactionInstantTime, (HoodieData)HoodieList.of(writeStatuses), this.config.getSchema());
        extraMetadata.ifPresent(m -> m.forEach((arg_0, arg_1) -> ((HoodieCommitMetadata)metadata).addMetadata(arg_0, arg_1)));
        this.completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void completeCompaction(HoodieCommitMetadata metadata, List<WriteStatus> writeStatuses, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String compactionCommitTime) {
        this.context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Collect compaction write status and commit compaction");
        List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
        try {
            HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", compactionCommitTime);
            this.txnManager.beginTransaction(Option.of((Object)compactionInstant), Option.empty());
            this.finalizeWrite(table, compactionCommitTime, writeStats);
            table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
            LOG.info("Committing Compaction {} finished with result {}.", (Object)compactionCommitTime, (Object)metadata);
            CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
        }
        finally {
            this.txnManager.endTransaction();
        }
        if (this.compactionTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.compactionTimer.stop());
            try {
                this.metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime((String)compactionCommitTime).getTime(), durationInMs, metadata, "compaction");
            }
            catch (ParseException e) {
                throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + this.config.getBasePath() + " at time " + compactionCommitTime, (Throwable)e);
            }
        }
        LOG.info("Compacted successfully on commit " + compactionCommitTime);
    }

    protected List<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
        try {
            List writeStatuses = (List)this.getHoodieTable().compact(this.context, compactionInstantTime).getWriteStatuses();
            this.commitCompaction(compactionInstantTime, writeStatuses, (Option<Map<String, String>>)Option.empty());
            return writeStatuses;
        }
        catch (IOException e) {
            throw new HoodieException("Error while compacting instant: " + compactionInstantTime);
        }
    }

    public HoodieWriteMetadata<List<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
        throw new HoodieNotSupportedException("Clustering is not supported yet");
    }

    protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        new UpgradeDowngrade(metaClient, this.config, this.context, (BaseUpgradeDowngradeHelper)FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.current(), instantTime);
        return this.getTableAndInitCtx(metaClient, operationType);
    }

    public void upgradeDowngrade(String instantTime) {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        new UpgradeDowngrade(metaClient, this.config, this.context, (BaseUpgradeDowngradeHelper)FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.current(), instantTime);
    }

    public void cleanHandles() {
        this.bucketToHandles.clear();
    }

    public void cleanHandlesGracefully() {
        this.bucketToHandles.values().forEach(handle -> ((MiniBatchHandle)handle).closeGracefully());
        this.bucketToHandles.clear();
    }

    private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(HoodieRecord<T> record, HoodieWriteConfig config, String instantTime, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, Iterator<HoodieRecord<T>> recordItr) {
        MiniBatchHandle lastHandle;
        HoodieRecordLocation loc = record.getCurrentLocation();
        String fileID = loc.getFileId();
        String partitionPath = record.getPartitionPath();
        boolean insertClustering = config.allowDuplicateInserts();
        if (this.bucketToHandles.containsKey(fileID) && (lastHandle = (MiniBatchHandle)this.bucketToHandles.get(fileID)).shouldReplace()) {
            FlinkMergeAndReplaceHandle writeHandle = insertClustering ? new FlinkConcatAndReplaceHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(), lastHandle.getWritePath()) : new FlinkMergeAndReplaceHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(), lastHandle.getWritePath());
            this.bucketToHandles.put(fileID, (HoodieWriteHandle<?, ?, ?, ?>)writeHandle);
            return writeHandle;
        }
        boolean isDelta = table.getMetaClient().getTableType().equals((Object)HoodieTableType.MERGE_ON_READ);
        MiniBatchHandle writeHandle = isDelta ? new FlinkAppendHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, partitionPath, fileID, recordItr, table.getTaskContextSupplier()) : (loc.getInstantTime().equals("I") ? new FlinkCreateHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, partitionPath, fileID, table.getTaskContextSupplier()) : (insertClustering ? new FlinkConcatHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier()) : new FlinkMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier())));
        this.bucketToHandles.put(fileID, (HoodieWriteHandle<?, ?, ?, ?>)writeHandle);
        return writeHandle;
    }

    private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) {
        HoodieFlinkTable<T> table;
        if (operationType == WriteOperationType.DELETE) {
            this.setWriteSchemaForDeletes(metaClient);
        }
        this.writeTimer = (table = this.getHoodieTable()).getMetaClient().getCommitActionType().equals("commit") ? this.metrics.getCommitCtx() : this.metrics.getDeltaCommitCtx();
        return table;
    }

    public HoodieFlinkTable<T> getHoodieTable() {
        return HoodieFlinkTable.create(this.config, (HoodieFlinkEngineContext)this.context);
    }

    public Map<String, List<String>> getPartitionToReplacedFileIds(WriteOperationType writeOperationType, List<WriteStatus> writeStatuses) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        switch (writeOperationType) {
            case INSERT_OVERWRITE: {
                return writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toMap(partition -> partition, partitionPath -> this.getAllExistingFileIds(table, (String)partitionPath)));
            }
            case INSERT_OVERWRITE_TABLE: {
                Map<String, List<String>> partitionToExistingFileIds = new HashMap<String, List<String>>();
                List partitionPaths = FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context, (HoodieMetadataConfig)this.config.getMetadataConfig(), (String)table.getMetaClient().getBasePath());
                if (partitionPaths != null && partitionPaths.size() > 0) {
                    this.context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
                    partitionToExistingFileIds = ((Stream)partitionPaths.stream().parallel()).collect(Collectors.toMap(partition -> partition, partition -> this.getAllExistingFileIds(table, (String)partition)));
                }
                return partitionToExistingFileIds;
            }
        }
        throw new AssertionError();
    }

    private List<String> getAllExistingFileIds(HoodieFlinkTable<T> table, String partitionPath) {
        return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
    }
}

