/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.bulk.sort;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.hudi.adapter.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SortOperator
extends TableStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData>,
BoundedOneInput {
    private static final Logger LOG = LoggerFactory.getLogger(SortOperator.class);
    private GeneratedNormalizedKeyComputer gComputer;
    private GeneratedRecordComparator gComparator;
    private Configuration conf;
    private transient BinaryExternalSorter sorter;
    private transient StreamRecordCollector<RowData> collector;
    private transient BinaryRowDataSerializer binarySerializer;

    public SortOperator(GeneratedNormalizedKeyComputer gComputer, GeneratedRecordComparator gComparator, Configuration conf) {
        this.gComputer = gComputer;
        this.gComparator = gComparator;
        this.conf = conf;
    }

    public void open() throws Exception {
        super.open();
        LOG.info("Opening SortOperator");
        ClassLoader cl = this.getContainingTask().getUserCodeClassLoader();
        AbstractRowDataSerializer inputSerializer = (AbstractRowDataSerializer)this.getOperatorConfig().getTypeSerializerIn1(this.getUserCodeClassloader());
        this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity());
        NormalizedKeyComputer computer = (NormalizedKeyComputer)this.gComputer.newInstance(cl);
        RecordComparator comparator = (RecordComparator)this.gComparator.newInstance(cl);
        this.gComputer = null;
        this.gComparator = null;
        MemoryManager memManager = this.getContainingTask().getEnvironment().getMemoryManager();
        this.sorter = Utils.getBinaryExternalSorter((Object)this.getContainingTask(), (MemoryManager)memManager, (long)this.computeMemorySize(), (IOManager)this.getContainingTask().getEnvironment().getIOManager(), (AbstractRowDataSerializer)inputSerializer, (BinaryRowDataSerializer)this.binarySerializer, (NormalizedKeyComputer)computer, (RecordComparator)comparator, (Configuration)this.conf);
        this.sorter.startThreads();
        this.collector = new StreamRecordCollector(this.output);
        this.getMetricGroup().gauge("memoryUsedSizeInBytes", () -> ((BinaryExternalSorter)this.sorter).getUsedMemoryInBytes());
        this.getMetricGroup().gauge("numSpillFiles", () -> ((BinaryExternalSorter)this.sorter).getNumSpillFiles());
        this.getMetricGroup().gauge("spillInBytes", () -> ((BinaryExternalSorter)this.sorter).getSpillInBytes());
    }

    public void processElement(StreamRecord<RowData> element) throws Exception {
        this.sorter.write((RowData)element.getValue());
    }

    public void endInput() throws Exception {
        BinaryRowData row = this.binarySerializer.createInstance();
        MutableObjectIterator iterator2 = this.sorter.getIterator();
        while ((row = iterator2.next(row)) != null) {
            this.collector.collect((Object)row);
        }
    }

    public void close() throws Exception {
        LOG.info("Closing SortOperator");
        super.close();
        if (this.sorter != null) {
            this.sorter.close();
        }
    }
}

