package org.apache.hudi.sink.utils;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.utils.TestConfigurations;

/* loaded from: input_file:org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.class */
public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
    private final Configuration conf;
    private final IOManager ioManager;
    private final StreamingRuntimeContext runtimeContext;
    private final MockOperatorEventGateway gateway;
    private final MockOperatorCoordinatorContext coordinatorContext;
    private final StreamWriteOperatorCoordinator coordinator;
    private final MockStateInitializationContext stateInitializationContext;
    private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
    private BootstrapOperator<HoodieRecord<?>, HoodieRecord<?>> bootstrapOperator;
    private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction;
    private final MockBucketAssignFunctionContext bucketAssignFunctionContext;
    private StreamWriteFunction<HoodieRecord<?>> writeFunction;
    private CompactFunctionWrapper compactFunctionWrapper;
    private final MockStreamTask streamTask;
    private final StreamConfig streamConfig;
    private final boolean asyncCompaction;

    /* loaded from: input_file:org/apache/hudi/sink/utils/StreamWriteFunctionWrapper$MockBucketAssignFunctionContext.class */
    private static class MockBucketAssignFunctionContext {
        private final Set<Object> updateKeys;

        private MockBucketAssignFunctionContext() {
            this.updateKeys = new HashSet();
        }

        public void setCurrentKey(Object obj) {
            this.updateKeys.add(obj);
        }

        public boolean isKeyInState(String str) {
            return this.updateKeys.contains(str);
        }
    }

    /* loaded from: input_file:org/apache/hudi/sink/utils/StreamWriteFunctionWrapper$ScalaCollector.class */
    private static class ScalaCollector<T> implements Collector<T> {
        private T val;

        private ScalaCollector() {
        }

        public static <T> ScalaCollector<T> getInstance() {
            return new ScalaCollector<>();
        }

        public void collect(T t) {
            this.val = t;
        }

        public void close() {
            this.val = null;
        }

        public T getVal() {
            return this.val;
        }
    }

    public StreamWriteFunctionWrapper(String str) throws Exception {
        this(str, TestConfigurations.getDefaultConf(str));
    }

    public StreamWriteFunctionWrapper(String str, Configuration configuration) throws Exception {
        this.ioManager = new IOManagerAsync();
        MockEnvironment build = new MockEnvironmentBuilder().setTaskName("mockTask").setManagedMemorySize(131072L).setIOManager(this.ioManager).build();
        this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, build);
        this.gateway = new MockOperatorEventGateway();
        this.conf = configuration;
        this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(configuration, this.coordinatorContext);
        this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
        this.stateInitializationContext = new MockStateInitializationContext();
        this.asyncCompaction = OptionsResolver.needsAsyncCompaction(configuration);
        this.streamConfig = new StreamConfig(configuration);
        this.streamConfig.setOperatorID(new OperatorID());
        this.streamTask = new MockStreamTaskBuilder(build).setConfig(new StreamConfig(configuration)).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build();
        this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf, this.streamTask, this.streamConfig);
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public void openFunction() throws Exception {
        this.coordinator.start();
        this.coordinator.setExecutor(new MockCoordinatorExecutor(this.coordinatorContext));
        this.toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, this.conf);
        this.toHoodieFunction.setRuntimeContext(this.runtimeContext);
        this.toHoodieFunction.open(this.conf);
        this.bucketAssignerFunction = new BucketAssignFunction<>(this.conf);
        this.bucketAssignerFunction.setRuntimeContext(this.runtimeContext);
        this.bucketAssignerFunction.open(this.conf);
        this.bucketAssignerFunction.initializeState(this.stateInitializationContext);
        if (this.conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
            this.bootstrapOperator = new BootstrapOperator<>(this.conf);
            CollectorOutput collectorOutput = new CollectorOutput();
            this.bootstrapOperator.setup(this.streamTask, this.streamConfig, collectorOutput);
            this.bootstrapOperator.initializeState(this.stateInitializationContext);
            ScalaCollector scalaCollector = ScalaCollector.getInstance();
            for (HoodieRecord hoodieRecord : collectorOutput.getRecords()) {
                this.bucketAssignerFunction.processElement(hoodieRecord, (KeyedProcessFunction.Context) null, scalaCollector);
                this.bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
            }
        }
        setupWriteFunction();
        if (this.asyncCompaction) {
            this.compactFunctionWrapper.openFunction();
        }
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public void invoke(I i) throws Exception {
        HoodieRecord map = this.toHoodieFunction.map((RowData) i);
        ScalaCollector scalaCollector = ScalaCollector.getInstance();
        this.bucketAssignerFunction.processElement(map, (KeyedProcessFunction.Context) null, scalaCollector);
        this.bucketAssignFunctionContext.setCurrentKey(map.getRecordKey());
        this.writeFunction.processElement(scalaCollector.getVal(), (ProcessFunction.Context) null, (Collector) null);
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public WriteMetadataEvent[] getEventBuffer() {
        return this.coordinator.getEventBuffer();
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public OperatorEvent getNextEvent() {
        return this.gateway.getNextEvent();
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public Map<String, List<HoodieRecord>> getDataBuffer() {
        return this.writeFunction.getDataBuffer();
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public void checkpointFunction(long j) throws Exception {
        this.coordinator.checkpointCoordinator(j, new CompletableFuture());
        if (this.conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
            this.bootstrapOperator.snapshotState((StateSnapshotContext) null);
        }
        this.bucketAssignerFunction.snapshotState((FunctionSnapshotContext) null);
        this.writeFunction.snapshotState(new MockFunctionSnapshotContext(j));
        this.stateInitializationContext.m6getOperatorStateStore().checkpointBegin(j);
    }

    public void endInput() {
        this.writeFunction.endInput();
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public void checkpointComplete(long j) {
        this.stateInitializationContext.m6getOperatorStateStore().checkpointSuccess(j);
        this.coordinator.notifyCheckpointComplete(j);
        this.bucketAssignerFunction.notifyCheckpointComplete(j);
        if (this.asyncCompaction) {
            try {
                this.compactFunctionWrapper.compact(j);
            } catch (Exception e) {
                throw new HoodieException(e);
            }
        }
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public void checkpointFails(long j) {
        this.coordinator.notifyCheckpointAborted(j);
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public void subTaskFails(int i) throws Exception {
        this.coordinator.subtaskFailed(i, new RuntimeException("Dummy exception"));
        setupWriteFunction();
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public void close() throws Exception {
        this.coordinator.close();
        this.ioManager.close();
        this.bucketAssignerFunction.close();
        this.writeFunction.close();
        if (this.compactFunctionWrapper != null) {
            this.compactFunctionWrapper.close();
        }
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public StreamWriteOperatorCoordinator getCoordinator() {
        return this.coordinator;
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public MockOperatorCoordinatorContext getCoordinatorContext() {
        return this.coordinatorContext;
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public boolean isKeyInState(HoodieKey hoodieKey) {
        return this.bucketAssignFunctionContext.isKeyInState(hoodieKey.getRecordKey());
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public boolean isConforming() {
        return this.writeFunction.isConfirming();
    }

    @Override // org.apache.hudi.sink.utils.TestFunctionWrapper
    public boolean isAlreadyBootstrap() throws Exception {
        return this.bootstrapOperator.isAlreadyBootstrap();
    }

    private void setupWriteFunction() throws Exception {
        this.writeFunction = new StreamWriteFunction<>(this.conf);
        this.writeFunction.setRuntimeContext(this.runtimeContext);
        this.writeFunction.setOperatorEventGateway(this.gateway);
        this.writeFunction.initializeState(this.stateInitializationContext);
        this.writeFunction.open(this.conf);
        this.coordinator.handleEventFromOperator(0, getNextEvent());
    }
}
