/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.operator;

import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KeyedWriteProcessFunction
extends KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>>
implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessFunction.class);
    private List<HoodieRecord> bufferedRecords = new LinkedList<HoodieRecord>();
    private Collector<Tuple3<String, List<WriteStatus>, Integer>> output;
    private int indexOfThisSubtask;
    private String latestInstant;
    private boolean hasRecordsIn;
    private HoodieFlinkStreamer.Config cfg;
    private transient HoodieFlinkWriteClient writeClient;

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.indexOfThisSubtask = this.getRuntimeContext().getIndexOfThisSubtask();
        this.cfg = (HoodieFlinkStreamer.Config)this.getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new SerializableConfiguration(new org.apache.hadoop.conf.Configuration()), (TaskContextSupplier)new FlinkTaskContextSupplier(this.getRuntimeContext()));
        this.writeClient = new HoodieFlinkWriteClient((HoodieEngineContext)context, StreamerUtil.getHoodieClientConfig(this.cfg));
    }

    public void snapshotState(FunctionSnapshotContext context) {
        String commitType = this.cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? "commit" : "deltacommit";
        List latestInstants = this.writeClient.getInflightsAndRequestedInstants(commitType);
        String string = this.latestInstant = latestInstants.isEmpty() ? null : (String)latestInstants.get(0);
        if (this.bufferedRecords.size() > 0) {
            this.hasRecordsIn = true;
            if (this.output != null && this.latestInstant != null) {
                List writeStatus;
                String instantTimestamp = this.latestInstant;
                LOG.info("Write records, subtask id = [{}]  checkpoint_id = [{}}] instant = [{}], record size = [{}]", new Object[]{this.indexOfThisSubtask, context.getCheckpointId(), instantTimestamp, this.bufferedRecords.size()});
                switch (this.cfg.operation) {
                    case INSERT: {
                        writeStatus = this.writeClient.insert(this.bufferedRecords, instantTimestamp);
                        break;
                    }
                    case UPSERT: {
                        writeStatus = this.writeClient.upsert(this.bufferedRecords, instantTimestamp);
                        break;
                    }
                    default: {
                        throw new HoodieFlinkStreamerException("Unknown operation : " + this.cfg.operation);
                    }
                }
                this.output.collect((Object)new Tuple3((Object)instantTimestamp, (Object)writeStatus, (Object)this.indexOfThisSubtask));
                this.bufferedRecords.clear();
            }
        } else {
            LOG.info("No data in subtask [{}]", (Object)this.indexOfThisSubtask);
            this.hasRecordsIn = false;
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) {
    }

    public void processElement(HoodieRecord hoodieRecord, KeyedProcessFunction.Context context, Collector<Tuple3<String, List<WriteStatus>, Integer>> collector) {
        if (this.output == null) {
            this.output = collector;
        }
        this.bufferedRecords.add(hoodieRecord);
    }

    public boolean hasRecordsIn() {
        return this.hasRecordsIn;
    }

    public String getLatestInstant() {
        return this.latestInstant;
    }

    public void close() {
        if (this.writeClient != null) {
            this.writeClient.close();
        }
    }
}

