/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.com.codahale.metrics.Timer;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;

public class HoodieFlinkWriteClient<T extends HoodieRecordPayload>
extends AbstractHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
    public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
        super(context, clientConfig);
    }

    public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
        super(context, writeConfig, rollbackPending);
    }

    public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending, Option<EmbeddedTimelineService> timelineService) {
        super(context, writeConfig, rollbackPending, timelineService);
    }

    @Override
    protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createIndex(HoodieWriteConfig writeConfig) {
        return FlinkHoodieIndex.createIndex((HoodieFlinkEngineContext)this.context, this.config);
    }

    @Override
    public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
        List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
        return this.commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
    }

    @Override
    protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
        return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext)this.context);
    }

    @Override
    public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
        HoodieFlinkTable table = HoodieFlinkTable.create(this.config, (HoodieFlinkEngineContext)this.context);
        Timer.Context indexTimer = this.metrics.getIndexCtx();
        List<HoodieRecord<T>> recordsWithLocation = this.getIndex().tagLocation(hoodieRecords, this.context, table);
        this.metrics.updateIndexMetrics("lookup", this.metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
        return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
    }

    @Override
    public void bootstrap(Option<Map<String, String>> extraMetadata) {
        throw new HoodieNotSupportedException("Bootstrap operation is not supported yet");
    }

    @Override
    public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table = this.getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.UPSERT);
        HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(this.context, instantTime, records);
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", result.getIndexLookupDuration().get().toMillis());
        }
        return this.postWrite((HoodieWriteMetadata)result, instantTime, (HoodieTable)table);
    }

    @Override
    public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        throw new HoodieNotSupportedException("UpsertPrepped operation is not supported yet");
    }

    @Override
    public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table = this.getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT);
        HoodieWriteMetadata<List<WriteStatus>> result = table.insert(this.context, instantTime, records);
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", result.getIndexLookupDuration().get().toMillis());
        }
        return this.postWrite((HoodieWriteMetadata)result, instantTime, (HoodieTable)table);
    }

    @Override
    public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        throw new HoodieNotSupportedException("InsertPrepped operation is not supported yet");
    }

    @Override
    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    @Override
    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    @Override
    public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
        throw new HoodieNotSupportedException("BulkInsertPrepped operation is not supported yet");
    }

    @Override
    public List<WriteStatus> delete(List<HoodieKey> keys2, String instantTime) {
        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table = this.getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
        this.preWrite(instantTime, WriteOperationType.DELETE);
        HoodieWriteMetadata<List<WriteStatus>> result = table.delete(this.context, instantTime, keys2);
        return this.postWrite((HoodieWriteMetadata)result, instantTime, (HoodieTable)table);
    }

    @Override
    protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result, String instantTime, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics(this.getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
        }
        return result.getWriteStatuses();
    }

    @Override
    public void commitCompaction(String compactionInstantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata) throws IOException {
        throw new HoodieNotSupportedException("Compaction is not supported yet");
    }

    @Override
    protected void completeCompaction(HoodieCommitMetadata metadata, List<WriteStatus> writeStatuses, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String compactionCommitTime) {
        throw new HoodieNotSupportedException("Compaction is not supported yet");
    }

    @Override
    protected List<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
        throw new HoodieNotSupportedException("Compaction is not supported yet");
    }

    @Override
    public HoodieWriteMetadata<List<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
        throw new HoodieNotSupportedException("Clustering is not supported yet");
    }

    @Override
    protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        new FlinkUpgradeDowngrade(metaClient, this.config, this.context).run(metaClient, HoodieTableVersion.current(), this.config, this.context, instantTime);
        return this.getTableAndInitCtx(metaClient, operationType);
    }

    private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) {
        HoodieFlinkTable table;
        if (operationType == WriteOperationType.DELETE) {
            this.setWriteSchemaForDeletes(metaClient);
        }
        this.writeTimer = (table = HoodieFlinkTable.create(this.config, (HoodieFlinkEngineContext)this.context, metaClient)).getMetaClient().getCommitActionType().equals("commit") ? this.metrics.getCommitCtx() : this.metrics.getDeltaCommitCtx();
        return table;
    }

    public List<String> getInflightsAndRequestedInstants(String commitType) {
        HoodieFlinkTable table = HoodieFlinkTable.create(this.config, (HoodieFlinkEngineContext)this.context);
        HoodieTimeline unCompletedTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
        return unCompletedTimeline.getInstants().filter(x -> x.getAction().equals(commitType)).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
    }
}

