package org.apache.hudi.sink.utils;

import java.util.Iterator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;

/* loaded from: input_file:org/apache/hudi/sink/utils/CompactFunctionWrapper.class */
public class CompactFunctionWrapper {
    private final Configuration conf;
    private final IOManager ioManager = new IOManagerAsync();
    private final StreamingRuntimeContext runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, new MockEnvironmentBuilder().setTaskName("mockTask").setManagedMemorySize(131072).setIOManager(this.ioManager).build());
    private final StreamTask<?, ?> streamTask;
    private final StreamConfig streamConfig;
    private CompactionPlanOperator compactionPlanOperator;
    private CollectorOutput<CompactionPlanEvent> planEventOutput;
    private CollectorOutput<CompactionCommitEvent> commitEventOutput;
    private CompactOperator compactOperator;
    private CompactionCommitSink commitSink;

    public CompactFunctionWrapper(Configuration configuration, StreamTask<?, ?> streamTask, StreamConfig streamConfig) {
        this.conf = configuration;
        this.streamTask = streamTask;
        this.streamConfig = streamConfig;
    }

    public void openFunction() throws Exception {
        this.compactionPlanOperator = new CompactionPlanOperator(this.conf);
        this.planEventOutput = new CollectorOutput<>();
        this.compactionPlanOperator.setup(this.streamTask, this.streamConfig, this.planEventOutput);
        this.compactionPlanOperator.open();
        this.compactOperator = new CompactOperator(this.conf);
        this.compactOperator.setProcessingTimeService(new TestProcessingTimeService());
        this.commitEventOutput = new CollectorOutput<>();
        this.compactOperator.setup(this.streamTask, this.streamConfig, this.commitEventOutput);
        this.compactOperator.open();
        this.compactOperator.setExecutor(new MockCoordinatorExecutor(new MockOperatorCoordinatorContext(new OperatorID(), 1)));
        this.commitSink = new CompactionCommitSink(this.conf);
        this.commitSink.setRuntimeContext(this.runtimeContext);
        this.commitSink.open(this.conf);
    }

    public void compact(long j) throws Exception {
        this.compactionPlanOperator.setOutput(this.planEventOutput);
        this.compactionPlanOperator.notifyCheckpointComplete(j);
        Iterator<CompactionPlanEvent> it = this.planEventOutput.getRecords().iterator();
        while (it.hasNext()) {
            this.compactOperator.processElement(new StreamRecord(it.next()));
        }
        Iterator<CompactionCommitEvent> it2 = this.commitEventOutput.getRecords().iterator();
        while (it2.hasNext()) {
            this.commitSink.invoke(it2.next(), (SinkFunction.Context) null);
        }
    }

    public void close() throws Exception {
        this.ioManager.close();
    }
}
