/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.clustering;

import io.hops.hudi.org.apache.avro.Schema;
import io.hops.hudi.org.apache.avro.generic.GenericRecord;
import io.hops.hudi.org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.hudi.adapter.MaskingOutputAdapter;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.metrics.FlinkClusteringMetrics;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.FlinkWriteClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusteringOperator
extends TableStreamOperator<ClusteringCommitEvent>
implements OneInputStreamOperator<ClusteringPlanEvent, ClusteringCommitEvent>,
BoundedOneInput {
    private static final Logger LOG = LoggerFactory.getLogger(ClusteringOperator.class);
    private final Configuration conf;
    private final RowType rowType;
    private int taskID;
    private transient HoodieWriteConfig writeConfig;
    private transient HoodieFlinkTable<?> table;
    private transient Schema schema;
    private transient Schema readerSchema;
    private transient int[] requiredPos;
    private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
    private transient HoodieFlinkWriteClient writeClient;
    private transient StreamRecordCollector<ClusteringCommitEvent> collector;
    private transient BinaryRowDataSerializer binarySerializer;
    private final boolean asyncClustering;
    private final boolean sortClusteringEnabled;
    private transient NonThrownExecutor executor;
    private transient FlinkClusteringMetrics clusteringMetrics;

    public ClusteringOperator(Configuration conf, RowType rowType) {
        this.conf = new Configuration(conf);
        this.rowType = BulkInsertWriterHelper.addMetadataFields(rowType, false);
        this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
        this.sortClusteringEnabled = OptionsResolver.sortClusteringEnabled(conf);
        this.conf.setLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES));
        this.conf.setLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT.key(), Math.min(this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES) / 1024L / 1024L, this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT)));
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<ClusteringCommitEvent>> output) {
        super.setup(containingTask, config, new MaskingOutputAdapter<ClusteringCommitEvent>(output));
    }

    public void open() throws Exception {
        super.open();
        this.taskID = this.getRuntimeContext().getIndexOfThisSubtask();
        this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf);
        this.writeClient = FlinkWriteClients.createWriteClient(this.conf, (RuntimeContext)this.getRuntimeContext());
        this.table = this.writeClient.getHoodieTable();
        this.schema = AvroSchemaConverter.convertToSchema((LogicalType)this.rowType);
        this.readerSchema = AvroSchemaUtils.asNullable(this.schema);
        this.requiredPos = this.getRequiredPositions();
        this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(this.rowType);
        this.binarySerializer = new BinaryRowDataSerializer(this.rowType.getFieldCount());
        if (this.asyncClustering) {
            this.executor = NonThrownExecutor.builder(LOG).build();
        }
        this.collector = new StreamRecordCollector(this.output);
        this.registerMetrics();
    }

    public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {
        ClusteringPlanEvent event = (ClusteringPlanEvent)element.getValue();
        String instantTime = event.getClusteringInstantTime();
        List<ClusteringOperation> clusteringOperations = event.getClusteringGroupInfo().getOperations();
        if (this.asyncClustering) {
            this.executor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.doClustering(instantTime, clusteringOperations)), (errMsg, t) -> this.collector.collect((Object)new ClusteringCommitEvent(instantTime, this.getFileIds(clusteringOperations), this.taskID)), "Execute clustering for instant %s from task %d", instantTime, this.taskID);
        } else {
            LOG.info("Execute clustering for instant {} from task {}", (Object)instantTime, (Object)this.taskID);
            this.doClustering(instantTime, clusteringOperations);
        }
    }

    public void close() throws Exception {
        if (null != this.executor) {
            this.executor.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
    }

    public void endInput() {
    }

    private void doClustering(String instantTime, List<ClusteringOperation> clusteringOperations) throws Exception {
        this.clusteringMetrics.startClustering();
        BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig, instantTime, this.taskID, this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getAttemptNumber(), this.rowType, true);
        Iterator<RowData> iterator2 = clusteringOperations.stream().anyMatch(operation -> CollectionUtils.nonEmpty(operation.getDeltaFilePaths())) ? this.readRecordsForGroupWithLogs(clusteringOperations, instantTime) : this.readRecordsForGroupBaseFiles(clusteringOperations);
        if (this.sortClusteringEnabled) {
            RowDataSerializer rowDataSerializer = new RowDataSerializer(this.rowType);
            BinaryExternalSorter sorter = this.initSorter();
            while (iterator2.hasNext()) {
                RowData rowData = iterator2.next();
                BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
                sorter.write((RowData)binaryRowData);
            }
            BinaryRowData row = this.binarySerializer.createInstance();
            while ((row = (BinaryRowData)sorter.getIterator().next((Object)row)) != null) {
                writerHelper.write((RowData)row);
            }
            sorter.close();
        } else {
            while (iterator2.hasNext()) {
                writerHelper.write(iterator2.next());
            }
        }
        List<WriteStatus> writeStatuses = writerHelper.getWriteStatuses(this.taskID);
        this.clusteringMetrics.endClustering();
        this.collector.collect((Object)new ClusteringCommitEvent(instantTime, this.getFileIds(clusteringOperations), writeStatuses, this.taskID));
        writerHelper.close();
    }

    private Iterator<RowData> readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps, String instantTime) {
        ArrayList recordIterators = new ArrayList();
        long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), this.writeConfig);
        LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
        for (ClusteringOperation clusteringOp : clusteringOps) {
            try {
                Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() : Option.of(HoodieIOFactory.getIOFactory(this.table.getStorage()).getReaderFactory(this.table.getConfig().getRecordMerger().getRecordType()).getFileReader(this.table.getConfig(), new StoragePath(clusteringOp.getDataFilePath())));
                HoodieMergedLogRecordScanner scanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withStorage(this.table.getStorage()).withBasePath(this.table.getMetaClient().getBasePath()).withLogFilePaths((List)clusteringOp.getDeltaFilePaths())).withReaderSchema(this.readerSchema).withLatestInstantTime(instantTime).withMaxMemorySizeInBytes(maxMemoryPerCompaction).withReverseReader(this.writeConfig.getCompactionReverseLogReadEnabled()).withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize()).withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath()).withDiskMapType(this.writeConfig.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(this.writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).withRecordMerger(this.writeConfig.getRecordMerger()).build();
                HoodieTableConfig tableConfig = this.table.getMetaClient().getTableConfig();
                Option<BaseKeyGenerator> keyGeneratorOpt = tableConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator)HoodieAvroKeyGeneratorFactory.createKeyGenerator(this.writeConfig.getProps()));
                HoodieFileSliceReader hoodieFileSliceReader = new HoodieFileSliceReader(baseFileReader, scanner, this.readerSchema, tableConfig.getPreCombineField(), this.writeConfig.getRecordMerger(), tableConfig.getProps(), tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp())), keyGeneratorOpt);
                recordIterators.add(StreamSupport.stream(Spliterators.spliteratorUnknownSize(hoodieFileSliceReader, 256), false).map(hoodieRecord -> {
                    try {
                        return this.transform((IndexedRecord)hoodieRecord.toIndexedRecord(this.readerSchema, new Properties()).get().getData());
                    }
                    catch (IOException e) {
                        throw new HoodieIOException("Failed to read next record", e);
                    }
                }).iterator());
            }
            catch (IOException e) {
                throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
            }
        }
        return new ConcatenatingIterator<RowData>(recordIterators);
    }

    private Iterator<RowData> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
        List iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> {
            Iterable indexedRecords = () -> {
                try {
                    HoodieFileReaderFactory fileReaderFactory = HoodieIOFactory.getIOFactory(this.table.getStorage()).getReaderFactory(this.table.getConfig().getRecordMerger().getRecordType());
                    HoodieAvroFileReader fileReader = (HoodieAvroFileReader)fileReaderFactory.getFileReader(this.table.getConfig(), new StoragePath(clusteringOp.getDataFilePath()));
                    return new CloseableMappingIterator<HoodieRecord, IndexedRecord>(fileReader.getRecordIterator(this.readerSchema), HoodieRecord::getData);
                }
                catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
            };
            return StreamSupport.stream(indexedRecords.spliterator(), false).map(this::transform).iterator();
        }).collect(Collectors.toList());
        return new ConcatenatingIterator<RowData>(iteratorsForPartition);
    }

    private RowData transform(IndexedRecord indexedRecord) {
        GenericRecord record = (GenericRecord)indexedRecord;
        return (RowData)this.avroToRowDataConverter.convert(record);
    }

    private int[] getRequiredPositions() {
        List fieldNames = this.readerSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
        return this.schema.getFields().stream().map(field -> fieldNames.indexOf(field.name())).mapToInt(i -> i).toArray();
    }

    private BinaryExternalSorter initSorter() {
        ClassLoader cl = this.getContainingTask().getUserCodeClassLoader();
        NormalizedKeyComputer computer = (NormalizedKeyComputer)this.createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
        RecordComparator comparator = (RecordComparator)this.createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);
        MemoryManager memManager = this.getContainingTask().getEnvironment().getMemoryManager();
        BinaryExternalSorter sorter = Utils.getBinaryExternalSorter(this.getContainingTask(), memManager, this.computeMemorySize(), this.getContainingTask().getEnvironment().getIOManager(), (AbstractRowDataSerializer<RowData>)this.binarySerializer, this.binarySerializer, computer, comparator, this.conf);
        sorter.startThreads();
        this.getMetricGroup().gauge("memoryUsedSizeInBytes", () -> ((BinaryExternalSorter)sorter).getUsedMemoryInBytes());
        this.getMetricGroup().gauge("numSpillFiles", () -> ((BinaryExternalSorter)sorter).getNumSpillFiles());
        this.getMetricGroup().gauge("spillInBytes", () -> ((BinaryExternalSorter)sorter).getSpillInBytes());
        return sorter;
    }

    private SortCodeGenerator createSortCodeGenerator() {
        SortOperatorGen sortOperatorGen = new SortOperatorGen(this.rowType, this.conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS).split(","));
        return sortOperatorGen.createSortCodeGenerator();
    }

    private String getFileIds(List<ClusteringOperation> clusteringOperations) {
        return clusteringOperations.stream().map(ClusteringOperation::getFileId).collect(Collectors.joining(","));
    }

    @VisibleForTesting
    public void setExecutor(NonThrownExecutor executor) {
        this.executor = executor;
    }

    @VisibleForTesting
    public void setOutput(Output<StreamRecord<ClusteringCommitEvent>> output) {
        this.output = output;
    }

    private void registerMetrics() {
        OperatorMetricGroup metrics = this.getRuntimeContext().getMetricGroup();
        this.clusteringMetrics = new FlinkClusteringMetrics((MetricGroup)metrics);
        this.clusteringMetrics.registerMetrics();
    }
}

