/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.run.strategy;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveOptimizationSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPayload<T>>
extends ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
    private static final Logger LOG = LogManager.getLogger(MultipleSparkJobExecutionStrategy.class);

    public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    @Override
    public HoodieWriteMetadata<JavaRDD<WriteStatus>> performClustering(HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
        JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
        Stream<JavaRDD<WriteStatus>> writeStatusRDDStream = clusteringPlan.getInputGroups().stream().map(inputGroup -> this.runClusteringForGroupAsync((HoodieClusteringGroup)((Object)inputGroup), clusteringPlan.getStrategy().getStrategyParams(), Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), instantTime)).map(CompletableFuture::join);
        JavaRDD<WriteStatus>[] writeStatuses = this.convertStreamToArray(writeStatusRDDStream);
        JavaRDD writeStatusRDD = engineContext.union(writeStatuses);
        HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = new HoodieWriteMetadata<JavaRDD<WriteStatus>>();
        writeMetadata.setWriteStatuses(writeStatusRDD);
        return writeMetadata;
    }

    public abstract JavaRDD<WriteStatus> performClusteringWithRecordsRDD(JavaRDD<HoodieRecord<T>> var1, int var2, String var3, Map<String, String> var4, Schema var5, List<HoodieFileGroupId> var6, boolean var7);

    protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
        if (this.getWriteConfig().isLayoutOptimizationEnabled()) {
            return Option.of(new RDDSpatialCurveOptimizationSortPartitioner((HoodieSparkEngineContext)this.getEngineContext(), this.getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema)));
        }
        if (strategyParams.containsKey(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key())) {
            return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), HoodieAvroUtils.addMetadataFields(schema)));
        }
        return Option.empty();
    }

    private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean preserveHoodieMetadata, String instantTime) {
        return CompletableFuture.supplyAsync(() -> {
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
            JavaRDD<HoodieRecord<T>> inputRecords = this.readRecordsForGroup(jsc, clusteringGroup, instantTime);
            Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.getWriteConfig().getSchema()));
            List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream().map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())).collect(Collectors.toList());
            return this.performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata);
        });
    }

    private JavaRDD<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
        List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
        if (hasLogFiles) {
            return this.readRecordsForGroupWithLogs(jsc, clusteringOps, instantTime);
        }
        return this.readRecordsForGroupBaseFiles(jsc, clusteringOps);
    }

    private JavaRDD<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps, String instantTime) {
        HoodieWriteConfig config = this.getWriteConfig();
        HoodieTable table = this.getHoodieTable();
        return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions((FlatMapFunction & Serializable)clusteringOpsPartition -> {
            ArrayList recordIterators = new ArrayList();
            clusteringOpsPartition.forEachRemaining(clusteringOp -> {
                long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config);
                LOG.info((Object)("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction));
                try {
                    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
                    HoodieMergedLogRecordScanner scanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withFileSystem(table.getMetaClient().getFs()).withBasePath(table.getMetaClient().getBasePath()).withLogFilePaths((List)clusteringOp.getDeltaFilePaths())).withReaderSchema(readerSchema).withLatestInstantTime(instantTime).withMaxMemorySizeInBytes(maxMemoryPerCompaction).withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()).withReverseReader(config.getCompactionReverseLogReadEnabled()).withBufferSize(config.getMaxDFSStreamBufferSize()).withSpillableMapBasePath(config.getSpillableMapBasePath()).withPartition(clusteringOp.getPartitionPath()).build();
                    Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
                    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
                    recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPayloadClass(), tableConfig.getPreCombineField(), tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()))));
                }
                catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
            });
            return new ConcatenatingIterator(recordIterators);
        });
    }

    private JavaRDD<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps) {
        return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions((FlatMapFunction & Serializable)clusteringOpsPartition -> {
            ArrayList iteratorsForPartition = new ArrayList();
            clusteringOpsPartition.forEachRemaining(clusteringOp -> {
                try {
                    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.getWriteConfig().getSchema()));
                    HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(this.getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
                    iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema));
                }
                catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
            });
            return new ConcatenatingIterator(iteratorsForPartition);
        }).map(this::transform);
    }

    private JavaRDD<WriteStatus>[] convertStreamToArray(Stream<JavaRDD<WriteStatus>> writeStatusRDDStream) {
        A[] writeStatusObjects = writeStatusRDDStream.toArray(Object[]::new);
        JavaRDD[] writeStatusRDDArray = new JavaRDD[writeStatusObjects.length];
        for (int i = 0; i < writeStatusObjects.length; ++i) {
            writeStatusRDDArray[i] = (JavaRDD)writeStatusObjects[i];
        }
        return writeStatusRDDArray;
    }

    private HoodieRecord<T> transform(IndexedRecord indexedRecord) {
        GenericRecord record = (GenericRecord)indexedRecord;
        Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
        if (!this.getWriteConfig().populateMetaFields()) {
            try {
                keyGeneratorOpt = Option.of((BaseKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(this.getWriteConfig().getProps())));
            }
            catch (IOException e) {
                throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
            }
        }
        String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt);
        String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt);
        HoodieKey hoodieKey = new HoodieKey(key, partition);
        RewriteAvroPayload avroPayload = new RewriteAvroPayload(record);
        HoodieRecord<RewriteAvroPayload> hoodieRecord = new HoodieRecord<RewriteAvroPayload>(hoodieKey, avroPayload);
        return hoodieRecord;
    }
}

