/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.run.strategy;

import java.util.Map;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;

public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
extends ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
    private static final Logger LOG = LogManager.getLogger(SparkSortAndSizeExecutionStrategy.class);

    public SparkSortAndSizeExecutionStrategy(HoodieSparkCopyOnWriteTable<T> table, HoodieSparkEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    public SparkSortAndSizeExecutionStrategy(HoodieSparkMergeOnReadTable<T> table, HoodieSparkEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    @Override
    public JavaRDD<WriteStatus> performClustering(JavaRDD<HoodieRecord<T>> inputRecords, int numOutputGroups, String instantTime, Map<String, String> strategyParams, Schema schema) {
        Properties props = this.getWriteConfig().getProps();
        props.put("hoodie.bulkinsert.shuffle.parallelism", String.valueOf(numOutputGroups));
        props.put("hoodie.auto.commit", Boolean.FALSE.toString());
        HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
        return SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, this.getHoodieTable(), newConfig, false, this.getPartitioner(strategyParams, schema), true, numOutputGroups);
    }

    protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
        if (strategyParams.containsKey("hoodie.clustering.plan.strategy.sort.columns")) {
            return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get("hoodie.clustering.plan.strategy.sort.columns").split(","), HoodieAvroUtils.addMetadataFields(schema)));
        }
        return Option.empty();
    }
}

