package org.apache.hudi.sink.utils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.utils.TestConfigurations;

/* loaded from: input_file:org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.class */
public class StreamWriteFunctionWrapper<I> {
    private final Configuration conf;
    private final IOManager ioManager;
    private final StreamingRuntimeContext runtimeContext;
    private final MockOperatorEventGateway gateway;
    private final StreamWriteOperatorCoordinator coordinator;
    private final MockFunctionInitializationContext functionInitializationContext;
    private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
    private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction;
    private StreamWriteFunction<Object, HoodieRecord<?>, Object> writeFunction;
    private CompactFunctionWrapper compactFunctionWrapper;

    public StreamWriteFunctionWrapper(String str) throws Exception {
        this(str, TestConfigurations.getDefaultConf(str));
    }

    public StreamWriteFunctionWrapper(String str, Configuration configuration) throws Exception {
        this.ioManager = new IOManagerAsync();
        this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, new MockEnvironmentBuilder().setTaskName("mockTask").setManagedMemorySize(131072L).setIOManager(this.ioManager).build());
        this.gateway = new MockOperatorEventGateway();
        this.conf = configuration;
        this.coordinator = new StreamWriteOperatorCoordinator(configuration, 1);
        this.functionInitializationContext = new MockFunctionInitializationContext();
        this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
    }

    public void openFunction() throws Exception {
        this.coordinator.start();
        this.toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, this.conf);
        this.toHoodieFunction.setRuntimeContext(this.runtimeContext);
        this.toHoodieFunction.open(this.conf);
        this.bucketAssignerFunction = new BucketAssignFunction<>(this.conf);
        this.bucketAssignerFunction.setRuntimeContext(this.runtimeContext);
        this.bucketAssignerFunction.open(this.conf);
        this.bucketAssignerFunction.initializeState(this.functionInitializationContext);
        this.writeFunction = new StreamWriteFunction<>(this.conf);
        this.writeFunction.setRuntimeContext(this.runtimeContext);
        this.writeFunction.setOperatorEventGateway(this.gateway);
        this.writeFunction.open(this.conf);
        if (this.conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
            this.compactFunctionWrapper.openFunction();
        }
    }

    public void invoke(I i) throws Exception {
        final HoodieRecord[] hoodieRecordArr = new HoodieRecord[1];
        this.bucketAssignerFunction.processElement(this.toHoodieFunction.map((RowData) i), (KeyedProcessFunction.Context) null, new Collector<HoodieRecord<?>>() { // from class: org.apache.hudi.sink.utils.StreamWriteFunctionWrapper.1
            public void collect(HoodieRecord<?> hoodieRecord) {
                hoodieRecordArr[0] = hoodieRecord;
            }

            public void close() {
            }
        });
        this.writeFunction.processElement(hoodieRecordArr[0], (KeyedProcessFunction.Context) null, (Collector) null);
    }

    public BatchWriteSuccessEvent[] getEventBuffer() {
        return this.coordinator.getEventBuffer();
    }

    public OperatorEvent getNextEvent() {
        return this.gateway.getNextEvent();
    }

    public Map<String, List<HoodieRecord>> getDataBuffer() {
        return this.writeFunction.getDataBuffer();
    }

    public HoodieFlinkWriteClient getWriteClient() {
        return this.writeFunction.getWriteClient();
    }

    public void checkpointFunction(long j) throws Exception {
        this.coordinator.checkpointCoordinator(j, new CompletableFuture());
        this.bucketAssignerFunction.snapshotState((FunctionSnapshotContext) null);
        this.writeFunction.snapshotState((FunctionSnapshotContext) null);
        this.functionInitializationContext.m3getOperatorStateStore().checkpointBegin(j);
    }

    public void checkpointComplete(long j) {
        this.functionInitializationContext.m3getOperatorStateStore().checkpointSuccess(j);
        this.coordinator.checkpointComplete(j);
        this.bucketAssignerFunction.notifyCheckpointComplete(j);
        this.writeFunction.notifyCheckpointComplete(j);
        if (this.conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
            try {
                this.compactFunctionWrapper.compact(j);
            } catch (Exception e) {
                throw new HoodieException(e);
            }
        }
    }

    public void checkpointFails(long j) {
        this.coordinator.notifyCheckpointAborted(j);
    }

    public void close() throws Exception {
        this.coordinator.close();
        this.ioManager.close();
    }

    public StreamWriteOperatorCoordinator getCoordinator() {
        return this.coordinator;
    }

    public void clearIndexState() {
        this.bucketAssignerFunction.clearIndexState();
    }

    public boolean isKeyInState(HoodieKey hoodieKey) {
        return this.bucketAssignerFunction.isKeyInState(hoodieKey);
    }

    public boolean isAllPartitionsLoaded() {
        return this.bucketAssignerFunction.isAllPartitionsLoaded();
    }
}
