package org.apache.hudi.table.action.cluster;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/* loaded from: input_file:org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.class */
public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseSparkCommitActionExecutor<T> {
    private static final Logger LOG = LogManager.getLogger(SparkExecuteClusteringCommitActionExecutor.class);
    private final HoodieClusteringPlan clusteringPlan;

    public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig, HoodieTable hoodieTable, String str) {
        super(hoodieEngineContext, hoodieWriteConfig, hoodieTable, str, WriteOperationType.CLUSTER);
        this.clusteringPlan = (HoodieClusteringPlan) ClusteringUtils.getClusteringPlan(hoodieTable.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(str)).map((v0) -> {
            return v0.getRight();
        }).orElseThrow(() -> {
            return new HoodieClusteringException("Unable to read clustering plan for instant: " + str);
        });
    }

    /* renamed from: execute, reason: merged with bridge method [inline-methods] */
    public HoodieWriteMetadata<JavaRDD<WriteStatus>> m32execute() {
        this.table.getActiveTimeline().transitionReplaceRequestedToInflight(HoodieTimeline.getReplaceCommitRequestedInstant(this.instantTime), Option.empty());
        this.table.getMetaClient().reloadActiveTimeline();
        JavaRDD<WriteStatus> javaRDD = (JavaRDD) this.clusteringPlan.getInputGroups().stream().map(hoodieClusteringGroup -> {
            return runClusteringForGroupAsync(hoodieClusteringGroup, this.clusteringPlan.getStrategy().getStrategyParams());
        }).map((v0) -> {
            return v0.join();
        }).reduce((javaRDD2, javaRDD3) -> {
            return javaRDD2.union(javaRDD3);
        }).orElse(HoodieSparkEngineContext.getSparkContext(this.context).emptyRDD());
        if (javaRDD.isEmpty()) {
            throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + this.instantTime + " #groups: " + this.clusteringPlan.getInputGroups().size());
        }
        HoodieWriteMetadata<JavaRDD<WriteStatus>> buildWriteMetadata = buildWriteMetadata(javaRDD);
        updateIndexAndCommitIfNeeded(javaRDD, buildWriteMetadata);
        if (!buildWriteMetadata.getCommitMetadata().isPresent()) {
            buildWriteMetadata.setCommitMetadata(Option.of(CommitUtils.buildMetadata(javaRDD.map((v0) -> {
                return v0.getStat();
            }).collect(), buildWriteMetadata.getPartitionToReplaceFileIds(), this.extraMetadata, this.operationType, getSchemaToStoreInCommit(), getCommitActionType())));
        }
        return buildWriteMetadata;
    }

    private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup hoodieClusteringGroup, Map<String, String> map) {
        return CompletableFuture.supplyAsync(() -> {
            return (JavaRDD) ((ClusteringExecutionStrategy) ReflectionUtils.loadClass(this.config.getClusteringExecutionStrategyClass(), new Object[]{this.table, this.context, this.config})).performClustering(readRecordsForGroup(HoodieSparkEngineContext.getSparkContext(this.context), hoodieClusteringGroup), hoodieClusteringGroup.getNumOutputFileGroups().intValue(), this.instantTime, map, HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.config.getSchema())));
        });
    }

    @Override // org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor
    protected String getCommitActionType() {
        return "replacecommit";
    }

    @Override // org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor
    protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> javaRDD) {
        return (Map) ClusteringUtils.getFileGroupsFromClusteringPlan(this.clusteringPlan).collect(Collectors.groupingBy(hoodieFileGroupId -> {
            return hoodieFileGroupId.getPartitionPath();
        }, Collectors.mapping(hoodieFileGroupId2 -> {
            return hoodieFileGroupId2.getFileId();
        }, Collectors.toList())));
    }

    private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroup(JavaSparkContext javaSparkContext, HoodieClusteringGroup hoodieClusteringGroup) {
        List<ClusteringOperation> list = (List) hoodieClusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        return list.stream().filter(clusteringOperation -> {
            return clusteringOperation.getDeltaFilePaths().size() > 0;
        }).findAny().isPresent() ? readRecordsForGroupWithLogs(javaSparkContext, list) : readRecordsForGroupBaseFiles(javaSparkContext, list);
    }

    private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroupWithLogs(JavaSparkContext javaSparkContext, List<ClusteringOperation> list) {
        return javaSparkContext.parallelize(list, list.size()).mapPartitions(it -> {
            ArrayList arrayList = new ArrayList();
            it.forEachRemaining(clusteringOperation -> {
                long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), this.config.getProps());
                LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
                try {
                    Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.config.getSchema()));
                    arrayList.add(HoodieFileSliceReader.getFileSliceReader(HoodieFileReaderFactory.getFileReader(this.table.getHadoopConf(), new Path(clusteringOperation.getDataFilePath())), new HoodieMergedLogRecordScanner(this.table.getMetaClient().getFs(), this.table.getMetaClient().getBasePath(), clusteringOperation.getDeltaFilePaths(), addMetadataFields, this.instantTime, Long.valueOf(maxMemoryPerCompaction), this.config.getCompactionLazyBlockReadEnabled().booleanValue(), this.config.getCompactionReverseLogReadEnabled().booleanValue(), this.config.getMaxDFSStreamBufferSize(), this.config.getSpillableMapBasePath()), addMetadataFields, this.table.getMetaClient().getTableConfig().getPayloadClass()));
                } catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOperation.getDataFilePath() + " and " + clusteringOperation.getDeltaFilePaths(), e);
                }
            });
            return new ConcatenatingIterator(arrayList);
        });
    }

    private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroupBaseFiles(JavaSparkContext javaSparkContext, List<ClusteringOperation> list) {
        return javaSparkContext.parallelize(list, list.size()).mapPartitions(it -> {
            ArrayList arrayList = new ArrayList();
            it.forEachRemaining(clusteringOperation -> {
                try {
                    arrayList.add(HoodieFileReaderFactory.getFileReader(this.table.getHadoopConf(), new Path(clusteringOperation.getDataFilePath())).getRecordIterator(HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.config.getSchema()))));
                } catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOperation.getDataFilePath() + " and " + clusteringOperation.getDeltaFilePaths(), e);
                }
            });
            return new ConcatenatingIterator(arrayList);
        }).map(this::transform);
    }

    private HoodieRecord<? extends HoodieRecordPayload> transform(IndexedRecord indexedRecord) {
        GenericRecord genericRecord = (GenericRecord) indexedRecord;
        return new HoodieRecord<>(new HoodieKey(genericRecord.get("_hoodie_record_key").toString(), genericRecord.get("_hoodie_partition_path").toString()), ReflectionUtils.loadPayload(this.table.getMetaClient().getTableConfig().getPayloadClass(), new Object[]{Option.of(genericRecord)}, new Class[]{Option.class}));
    }

    private HoodieWriteMetadata<JavaRDD<WriteStatus>> buildWriteMetadata(JavaRDD<WriteStatus> javaRDD) {
        HoodieWriteMetadata<JavaRDD<WriteStatus>> hoodieWriteMetadata = new HoodieWriteMetadata<>();
        hoodieWriteMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(javaRDD));
        hoodieWriteMetadata.setWriteStatuses(javaRDD);
        hoodieWriteMetadata.setWriteStats(javaRDD.map((v0) -> {
            return v0.getStat();
        }).collect());
        hoodieWriteMetadata.setCommitMetadata(Option.empty());
        hoodieWriteMetadata.setCommitted(false);
        return hoodieWriteMetadata;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -532528310:
                if (implMethodName.equals("lambda$readRecordsForGroupWithLogs$763f17fa$1")) {
                    z = 2;
                    break;
                }
                break;
            case -75141430:
                if (implMethodName.equals("getStat")) {
                    z = 3;
                    break;
                }
                break;
            case 912622979:
                if (implMethodName.equals("lambda$readRecordsForGroupBaseFiles$763f17fa$1")) {
                    z = true;
                    break;
                }
                break;
            case 1052666732:
                if (implMethodName.equals("transform")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/avro/generic/IndexedRecord;)Lorg/apache/hudi/common/model/HoodieRecord;")) {
                    SparkExecuteClusteringCommitActionExecutor sparkExecuteClusteringCommitActionExecutor = (SparkExecuteClusteringCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    return sparkExecuteClusteringCommitActionExecutor::transform;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    SparkExecuteClusteringCommitActionExecutor sparkExecuteClusteringCommitActionExecutor2 = (SparkExecuteClusteringCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    return it -> {
                        List arrayList = new ArrayList();
                        it.forEachRemaining(clusteringOperation -> {
                            try {
                                arrayList.add(HoodieFileReaderFactory.getFileReader(this.table.getHadoopConf(), new Path(clusteringOperation.getDataFilePath())).getRecordIterator(HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.config.getSchema()))));
                            } catch (IOException e) {
                                throw new HoodieClusteringException("Error reading input data for " + clusteringOperation.getDataFilePath() + " and " + clusteringOperation.getDeltaFilePaths(), e);
                            }
                        });
                        return new ConcatenatingIterator(arrayList);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    SparkExecuteClusteringCommitActionExecutor sparkExecuteClusteringCommitActionExecutor3 = (SparkExecuteClusteringCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    return it2 -> {
                        List arrayList = new ArrayList();
                        it2.forEachRemaining(clusteringOperation -> {
                            long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), this.config.getProps());
                            LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
                            try {
                                Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.config.getSchema()));
                                arrayList.add(HoodieFileSliceReader.getFileSliceReader(HoodieFileReaderFactory.getFileReader(this.table.getHadoopConf(), new Path(clusteringOperation.getDataFilePath())), new HoodieMergedLogRecordScanner(this.table.getMetaClient().getFs(), this.table.getMetaClient().getBasePath(), clusteringOperation.getDeltaFilePaths(), addMetadataFields, this.instantTime, Long.valueOf(maxMemoryPerCompaction), this.config.getCompactionLazyBlockReadEnabled().booleanValue(), this.config.getCompactionReverseLogReadEnabled().booleanValue(), this.config.getMaxDFSStreamBufferSize(), this.config.getSpillableMapBasePath()), addMetadataFields, this.table.getMetaClient().getTableConfig().getPayloadClass()));
                            } catch (IOException e) {
                                throw new HoodieClusteringException("Error reading input data for " + clusteringOperation.getDataFilePath() + " and " + clusteringOperation.getDeltaFilePaths(), e);
                            }
                        });
                        return new ConcatenatingIterator(arrayList);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/client/WriteStatus") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/hudi/common/model/HoodieWriteStat;")) {
                    return (v0) -> {
                        return v0.getStat();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/client/WriteStatus") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/hudi/common/model/HoodieWriteStat;")) {
                    return (v0) -> {
                        return v0.getStat();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
