/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkSampleWritesUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);

    public static Option<HoodieWriteConfig> getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig) {
        if (!writeConfig.getBoolean(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED).booleanValue()) {
            LOG.debug("Skip overwriting record size estimate as it's disabled.");
            return Option.empty();
        }
        HoodieTableMetaClient metaClient = SparkSampleWritesUtils.getMetaClient(jsc, writeConfig.getBasePath());
        if (metaClient.isTimelineNonEmpty()) {
            LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
            return Option.empty();
        }
        try {
            String instantTime = HoodieInstantTimeGenerator.getInstantFromTemporalAccessor(Instant.now().atZone(ZoneId.systemDefault()));
            Pair<Boolean, String> result = SparkSampleWritesUtils.doSampleWrites(jsc, records, writeConfig, instantTime);
            if (result.getLeft().booleanValue()) {
                long avgSize = SparkSampleWritesUtils.getAvgSizeFromSampleWrites(jsc, result.getRight());
                LOG.info("Overwriting record size estimate to " + avgSize);
                TypedProperties props = writeConfig.getProps();
                props.put(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(avgSize));
                return Option.of(HoodieWriteConfig.newBuilder().withProperties(props).build());
            }
        }
        catch (IOException e) {
            LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), (Throwable)e);
        }
        return Option.empty();
    }

    private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
        String sampleWritesBasePath = SparkSampleWritesUtils.getSampleWritesBasePath(jsc, writeConfig, instantTime);
        HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.COPY_ON_WRITE).setTableName(String.format("%s_samples_%s", writeConfig.getTableName(), instantTime)).setCDCEnabled(false).initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
        TypedProperties props = writeConfig.getProps();
        props.put(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "false");
        HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder().withProps(props).withTableServicesEnabled(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).withSchemaEvolutionEnable(false).withBulkInsertParallelism(1).withAutoCommit(true).withPath(sampleWritesBasePath).build();
        try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient((HoodieEngineContext)new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty());){
            int size = writeConfig.getIntOrDefault(HoodieStreamerConfig.SAMPLE_WRITES_SIZE);
            List samples = records.coalesce(1).take(size);
            sampleWriteClient.startCommitWithTime(instantTime);
            JavaRDD writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
            if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0L) {
                LOG.error(String.format("sample writes for table %s failed with errors.", writeConfig.getTableName()));
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Printing out the top 100 errors");
                    writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
                        LOG.trace("Global error :", ws.getGlobalError());
                        ws.getErrors().forEach((key, throwable) -> LOG.trace(String.format("Error for key: %s", key), throwable));
                    });
                }
                Pair<Boolean, Object> pair = Pair.of(false, null);
                return pair;
            }
            Pair<Boolean, String> pair = Pair.of(true, sampleWritesBasePath);
            return pair;
        }
    }

    private static String getSampleWritesBasePath(JavaSparkContext jsc, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
        CachingPath basePath = new CachingPath(writeConfig.getBasePath(), ".hoodie/.aux/.sample_writes/" + instantTime);
        FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
        if (fs.exists((Path)basePath)) {
            fs.delete((Path)basePath, true);
        }
        return basePath.toString();
    }

    private static long getAvgSizeFromSampleWrites(JavaSparkContext jsc, String sampleWritesBasePath) throws IOException {
        HoodieTableMetaClient metaClient = SparkSampleWritesUtils.getMetaClient(jsc, sampleWritesBasePath);
        Option<HoodieInstant> lastInstantOpt = metaClient.getCommitTimeline().filterCompletedInstants().lastInstant();
        ValidationUtils.checkState(lastInstantOpt.isPresent(), "The only completed instant should be present in sample_writes table.");
        HoodieInstant instant = lastInstantOpt.get();
        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getCommitTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
        long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
        long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
        return (long)Math.ceil(1.0 * (double)totalBytesWritten / (double)totalRecordsWritten);
    }

    private static HoodieTableMetaClient getMetaClient(JavaSparkContext jsc, String basePath) {
        FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
        return HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
    }
}

