/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

public class SparkHoodieBackedTableMetadataWriter
extends HoodieBackedTableMetadataWriter {
    private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);

    public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, Option<T> actionMetadata, Option<String> inflightInstantTimestamp) {
        return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, inflightInstantTimestamp);
    }

    public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
        return SparkHoodieBackedTableMetadataWriter.create(conf, writeConfig, context, Option.empty(), Option.empty());
    }

    <T extends SpecificRecordBase> SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, Option<T> actionMetadata, Option<String> inflightInstantTimestamp) {
        super(hadoopConf, writeConfig, engineContext, actionMetadata, inflightInstantTimestamp);
    }

    @Override
    protected void initRegistry() {
        if (this.metadataWriteConfig.isMetricsOn()) {
            Registry registry = this.metadataWriteConfig.isExecutorMetricsEnabled() ? Registry.getRegistry("HoodieMetadata", DistributedRegistry.class.getName()) : Registry.getRegistry("HoodieMetadata");
            this.metrics = Option.of(new HoodieMetadataMetrics(registry));
        } else {
            this.metrics = Option.empty();
        }
    }

    @Override
    protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext, Option<T> actionMetadata, Option<String> inflightInstantTimestamp) {
        try {
            this.metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
                if (registry instanceof DistributedRegistry) {
                    HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)engineContext;
                    ((DistributedRegistry)registry).register(sparkEngineContext.getJavaSparkContext());
                }
            });
            if (this.enabled) {
                this.bootstrapIfNeeded(engineContext, this.dataMetaClient, actionMetadata, inflightInstantTimestamp);
            }
        }
        catch (IOException e) {
            LOG.error((Object)"Failed to initialize metadata table. Disabling the writer.", (Throwable)e);
            this.enabled = false;
        }
    }

    @Override
    protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) {
        ValidationUtils.checkState(this.metadataMetaClient != null, "Metadata table is not fully initialized yet.");
        ValidationUtils.checkState(this.enabled, "Metadata table cannot be committed to as it is not enabled");
        JavaRDD records = (JavaRDD)hoodieDataRecords.get();
        JavaRDD<HoodieRecord> recordRDD = this.prepRecords((JavaRDD<HoodieRecord>)records, partitionName, 1);
        try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(this.engineContext, this.metadataWriteConfig, true);){
            if (!this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) {
                writeClient.startCommitWithTime(instantTime);
            } else {
                HoodieInstant alreadyCompletedInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get();
                HoodieActiveTimeline.deleteInstantFile(this.metadataMetaClient.getFs(), this.metadataMetaClient.getMetaPath(), alreadyCompletedInstant);
                this.metadataMetaClient.reloadActiveTimeline();
            }
            List statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime).collect();
            statuses.forEach(writeStatus -> {
                if (writeStatus.hasErrors()) {
                    throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
                }
            });
            this.metadataMetaClient.reloadActiveTimeline();
            if (canTriggerTableService) {
                this.compactIfNecessary(writeClient, instantTime);
                this.doClean(writeClient, instantTime);
                writeClient.archive();
            }
        }
        this.metrics.ifPresent(m -> m.updateSizeMetrics(this.metadataMetaClient, this.metadata));
    }

    private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
        List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(this.metadataMetaClient, partitionName, false);
        ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
        return recordsRDD.map((Function & Serializable)r -> {
            FileSlice slice = (FileSlice)fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
            r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
            return r;
        });
    }
}

