/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class SparkHoodieBackedTableMetadataWriter
extends HoodieBackedTableMetadataWriter {
    private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);

    public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
        return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context);
    }

    SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
        super(hadoopConf, writeConfig, engineContext);
    }

    @Override
    protected void initRegistry() {
        if (this.metadataWriteConfig.isMetricsOn()) {
            Registry registry = this.metadataWriteConfig.isExecutorMetricsEnabled() ? Registry.getRegistry("HoodieMetadata", DistributedRegistry.class.getName()) : Registry.getRegistry("HoodieMetadata");
            this.metrics = Option.of(new HoodieMetadataMetrics(registry));
        } else {
            this.metrics = Option.empty();
        }
    }

    @Override
    protected void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) {
        try {
            this.metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
                if (registry instanceof DistributedRegistry) {
                    HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)engineContext;
                    ((DistributedRegistry)registry).register(sparkEngineContext.getJavaSparkContext());
                }
            });
            if (this.enabled) {
                this.bootstrapIfNeeded(engineContext, datasetMetaClient);
            }
        }
        catch (IOException e) {
            LOG.error((Object)"Failed to initialize metadata table. Disabling the writer.", (Throwable)e);
            this.enabled = false;
        }
    }

    @Override
    protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
        ValidationUtils.checkState(this.enabled, "Metadata table cannot be committed to as it is not enabled");
        JavaRDD<HoodieRecord> recordRDD = this.prepRecords(records, partitionName);
        try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(this.engineContext, this.metadataWriteConfig, true);){
            writeClient.startCommitWithTime(instantTime);
            List statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime).collect();
            statuses.forEach(writeStatus -> {
                if (writeStatus.hasErrors()) {
                    throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
                }
            });
            if (writeClient.scheduleCompactionAtInstant(instantTime + "001", Option.empty())) {
                writeClient.compact(instantTime + "001");
            }
            writeClient.clean(instantTime + "002");
        }
        this.metrics.ifPresent(m -> {
            try {
                Map<String, String> stats = m.getStats(false, this.metaClient, (HoodieTableMetadata)this.metadata);
                m.updateMetrics(Long.parseLong(stats.get("totalBaseFileSizeInBytes")), Long.parseLong(stats.get("totalLogFileSizeInBytes")), Integer.parseInt(stats.get("baseFileCount")), Integer.parseInt(stats.get("logFileCount")));
            }
            catch (HoodieIOException e) {
                LOG.error((Object)"Could not publish metadata size metrics", (Throwable)e);
            }
        });
    }

    private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName) {
        String instantTime;
        String fileId;
        HoodieSparkTable table = HoodieSparkTable.create(this.metadataWriteConfig, this.engineContext);
        TableFileSystemView.SliceView fsView = table.getSliceView();
        List baseFiles = fsView.getLatestFileSlices(partitionName).map(FileSlice::getBaseFile).filter(Option::isPresent).map(Option::get).collect(Collectors.toList());
        if (partitionName.equals(MetadataPartitionType.FILES.partitionPath()) && baseFiles.size() > 1) {
            throw new HoodieMetadataException("Multiple base files found in metadata partition");
        }
        JavaSparkContext jsc = ((HoodieSparkEngineContext)this.engineContext).getJavaSparkContext();
        if (!baseFiles.isEmpty()) {
            fileId = ((HoodieBaseFile)baseFiles.get(0)).getFileId();
            instantTime = ((HoodieBaseFile)baseFiles.get(0)).getCommitTime();
        } else {
            List logFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).map(FileSlice::getLatestLogFile).filter(Option::isPresent).map(Option::get).collect(Collectors.toList());
            if (logFiles.isEmpty()) {
                return jsc.parallelize(records, 1);
            }
            fileId = ((HoodieLogFile)logFiles.get(0)).getFileId();
            instantTime = ((HoodieLogFile)logFiles.get(0)).getBaseCommitTime();
        }
        return jsc.parallelize(records, 1).map((Function & Serializable)r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId)));
    }
}

