/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkHoodieBackedTableMetadataWriter
extends HoodieBackedTableMetadataWriter<List<HoodieRecord>> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkHoodieBackedTableMetadataWriter.class);

    public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
        return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, context, Option.empty());
    }

    public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, Option<String> inFlightInstantTimestamp) {
        return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, context, inFlightInstantTimestamp);
    }

    public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext context, Option<String> inFlightInstantTimestamp) {
        return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, failedWritesCleaningPolicy, context, inFlightInstantTimestamp);
    }

    FlinkHoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<String> inFlightInstantTimestamp) {
        super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inFlightInstantTimestamp);
    }

    @Override
    protected void initRegistry() {
        this.metrics = this.metadataWriteConfig.isMetricsOn() ? Option.of(new HoodieMetadataMetrics(this.metadataWriteConfig.getMetricsConfig(), this.dataMetaClient.getStorage())) : Option.empty();
    }

    @Override
    protected void updateColumnsToIndexWithColStats(List<String> columnsToIndex) {
    }

    @Override
    protected void commit(String instantTime, Map<String, HoodieData<HoodieRecord>> partitionRecordsMap) {
        this.commitInternal(instantTime, partitionRecordsMap, false, Option.empty());
    }

    @Override
    protected List<HoodieRecord> convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> records) {
        return records.collectAsList();
    }

    @Override
    protected void bulkCommit(String instantTime, String partitionName, HoodieData<HoodieRecord> records, int fileGroupCount) {
        this.commitInternal(instantTime, Collections.singletonMap(partitionName, records), true, Option.empty());
    }

    @Override
    protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
        ValidationUtils.checkState(this.metadataMetaClient != null, "Metadata table is not fully initialized yet.");
        HoodieData<HoodieRecord> preppedRecords = this.prepRecords(partitionRecordsMap);
        List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
        BaseHoodieWriteClient<?, List<HoodieRecord>, ?, ?> writeClient = this.getWriteClient();
        if (writeClient.rollbackFailedWrites(this.metadataMetaClient)) {
            this.metadataMetaClient = HoodieTableMetaClient.reload(this.metadataMetaClient);
        }
        this.compactIfNecessary(writeClient, Option.empty());
        if (!this.metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
            LOG.info("New commit at " + instantTime + " being applied to MDT.");
        } else {
            Option<HoodieInstant> alreadyCompletedInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.requestedTime().equals(instantTime)).lastInstant();
            LOG.info(String.format("%s completed commit at %s being applied to MDT.", alreadyCompletedInstant.isPresent() ? "Already" : "Partially", instantTime));
            if (!writeClient.rollback(instantTime)) {
                throw new HoodieMetadataException("Failed to rollback deltacommit at " + instantTime + " from MDT");
            }
            this.metadataMetaClient.reloadActiveTimeline();
        }
        writeClient.startCommitWithTime(instantTime);
        this.preWrite(instantTime);
        List statuses = isInitializing ? (List)writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, bulkInsertPartitioner) : (List)writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
        writeClient.commit(instantTime, statuses, Option.empty(), "deltacommit", Collections.emptyMap());
        this.metadataMetaClient.reloadActiveTimeline();
        this.cleanIfNecessary(writeClient, "");
        writeClient.archive();
        this.metrics.ifPresent(m -> m.updateSizeMetrics(this.metadataMetaClient, this.metadata, this.dataMetaClient.getTableConfig().getMetadataPartitions()));
    }

    @Override
    public void deletePartitions(String instantTime, List<MetadataPartitionType> partitions) {
        throw new HoodieNotSupportedException("Dropping metadata index not supported for Flink metadata table yet.");
    }

    @Override
    public BaseHoodieWriteClient<?, List<HoodieRecord>, ?, ?> initializeWriteClient() {
        return new HoodieFlinkWriteClient(this.engineContext, this.metadataWriteConfig);
    }

    @Override
    protected void preWrite(String instantTime) {
        this.metadataMetaClient.getActiveTimeline().transitionRequestedToInflight("deltacommit", instantTime);
    }

    @Override
    protected HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, StorageConfiguration<?> storageConf, String instantTime) {
        throw new HoodieNotSupportedException("Flink metadata table does not support expression index yet.");
    }

    @Override
    protected HoodieTable getTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) {
        return HoodieFlinkTable.create(writeConfig, this.engineContext, metaClient);
    }

    @Override
    protected EngineType getEngineType() {
        return EngineType.FLINK;
    }
}

