/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.run.strategy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.JavaTaskContextSupplier;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JavaExecutionStrategy<T>
extends ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(JavaExecutionStrategy.class);

    public JavaExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    @Override
    public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
        ArrayList writeStatusList = new ArrayList();
        clusteringPlan.getInputGroups().forEach(inputGroup -> writeStatusList.addAll(this.runClusteringForGroup((HoodieClusteringGroup)((Object)inputGroup), clusteringPlan.getStrategy().getStrategyParams(), Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), instantTime)));
        HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<HoodieData<WriteStatus>>();
        writeMetadata.setWriteStatuses(HoodieListData.eager(writeStatusList));
        return writeMetadata;
    }

    public abstract List<WriteStatus> performClusteringWithRecordList(List<HoodieRecord<T>> var1, int var2, String var3, Map<String, String> var4, Schema var5, List<HoodieFileGroupId> var6, boolean var7);

    protected BulkInsertPartitioner<List<HoodieRecord<T>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
        if (strategyParams.containsKey(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key())) {
            return new JavaCustomColumnsSortPartitioner(strategyParams.get(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), HoodieAvroUtils.addMetadataFields(schema), this.getWriteConfig());
        }
        return JavaBulkInsertInternalPartitionerFactory.get(this.getWriteConfig().getBulkInsertSortMode());
    }

    private List<WriteStatus> runClusteringForGroup(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean preserveHoodieMetadata, String instantTime) {
        List<HoodieRecord<T>> inputRecords = this.readRecordsForGroup(clusteringGroup, instantTime);
        Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.getWriteConfig().getSchema()));
        List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream().map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())).collect(Collectors.toList());
        return this.performClusteringWithRecordList(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata);
    }

    private List<HoodieRecord<T>> readRecordsForGroup(HoodieClusteringGroup clusteringGroup, String instantTime) {
        List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
        if (hasLogFiles) {
            return this.readRecordsForGroupWithLogs(clusteringOps, instantTime);
        }
        return this.readRecordsForGroupBaseFiles(clusteringOps);
    }

    private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps, String instantTime) {
        HoodieWriteConfig config = this.getWriteConfig();
        HoodieTable table = this.getHoodieTable();
        ArrayList<HoodieRecord<T>> records = new ArrayList<HoodieRecord<T>>();
        clusteringOps.forEach(clusteringOp -> {
            long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new JavaTaskContextSupplier(), config);
            LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
            Option<HoodieFileReader> baseFileReader = Option.empty();
            HoodieMergedLogRecordScanner scanner = null;
            try {
                Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
                scanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withFileSystem(table.getMetaClient().getFs()).withBasePath(table.getMetaClient().getBasePath()).withLogFilePaths((List)clusteringOp.getDeltaFilePaths())).withReaderSchema(readerSchema).withLatestInstantTime(instantTime).withMaxMemorySizeInBytes(maxMemoryPerCompaction).withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()).withReverseReader(config.getCompactionReverseLogReadEnabled()).withBufferSize(config.getMaxDFSStreamBufferSize()).withSpillableMapBasePath(config.getSpillableMapBasePath()).withPartition(clusteringOp.getPartitionPath()).withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).withRecordMerger(config.getRecordMerger()).build();
                baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() : Option.of(HoodieFileReaderFactory.getReaderFactory(this.recordType).getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
                HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
                HoodieFileSliceReader fileSliceReader = HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getProps(), tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp())));
                fileSliceReader.forEachRemaining(records::add);
                if (scanner != null) {
                    scanner.close();
                }
                if (baseFileReader.isPresent()) {
                    baseFileReader.get().close();
                }
            }
            catch (IOException e) {
                try {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
                catch (Throwable throwable) {
                    if (scanner != null) {
                        scanner.close();
                    }
                    if (baseFileReader.isPresent()) {
                        ((HoodieFileReader)baseFileReader.get()).close();
                    }
                    throw throwable;
                }
            }
        });
        return records;
    }

    private List<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
        ArrayList records = new ArrayList();
        clusteringOps.forEach(clusteringOp -> {
            try (HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(this.recordType).getFileReader(this.getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));){
                Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.getWriteConfig().getSchema()));
                ClosableIterator recordIterator = baseFileReader.getRecordIterator(readerSchema);
                recordIterator.forEachRemaining(record -> records.add(record.copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, new Properties(), Option.empty())));
            }
            catch (IOException e) {
                throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
            }
        });
        return records;
    }
}

