/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InstantGenerateOperator
extends AbstractStreamOperator<HoodieRecord>
implements OneInputStreamOperator<HoodieRecord, HoodieRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
    public static final String NAME = "InstantGenerateOperator";
    private HoodieFlinkStreamer.Config cfg;
    private HoodieFlinkWriteClient writeClient;
    private SerializableConfiguration serializableHadoopConf;
    private transient FileSystem fs;
    private String latestInstant = "";
    private List<String> latestInstantList = new ArrayList<String>(1);
    private transient ListState<String> latestInstantState;
    private List<StreamRecord> bufferedRecords = new LinkedList<StreamRecord>();
    private transient ListState<StreamRecord> recordsState;
    private Integer retryTimes;
    private Integer retryInterval;

    public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
        if (streamRecord.getValue() != null) {
            this.bufferedRecords.add(streamRecord);
            this.output.collect(streamRecord);
        }
    }

    public void open() throws Exception {
        super.open();
        this.cfg = (HoodieFlinkStreamer.Config)this.getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        this.retryTimes = Integer.valueOf(this.cfg.blockRetryTime);
        this.retryInterval = Integer.valueOf(this.cfg.blockRetryInterval);
        this.serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
        this.fs = FSUtils.getFs(this.cfg.targetBasePath, this.serializableHadoopConf.get());
        FlinkTaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
        this.writeClient = new HoodieFlinkWriteClient((HoodieEngineContext)new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(this.cfg), true);
        this.initTable();
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        super.prepareSnapshotPreBarrier(checkpointId);
        if (!StringUtils.isNullOrEmpty(this.latestInstant)) {
            this.doCheck();
            this.latestInstant = "";
        }
        if (!this.bufferedRecords.isEmpty()) {
            this.latestInstant = this.startNewInstant(checkpointId);
        }
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        ListStateDescriptor latestInstantStateDescriptor = new ListStateDescriptor("latestInstant", String.class);
        this.latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
        ListStateDescriptor recordsStateDescriptor = new ListStateDescriptor("recordsState", StreamRecord.class);
        this.recordsState = context.getOperatorStateStore().getListState(recordsStateDescriptor);
        if (context.isRestored()) {
            Iterator latestInstantIterator = ((Iterable)this.latestInstantState.get()).iterator();
            latestInstantIterator.forEachRemaining(x -> {
                this.latestInstant = x;
            });
            LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", (Object)this.latestInstant);
            Iterator recordIterator = ((Iterable)this.recordsState.get()).iterator();
            this.bufferedRecords.clear();
            recordIterator.forEachRemaining(x -> this.bufferedRecords.add((StreamRecord)x));
        }
    }

    public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception {
        if (this.latestInstantList.isEmpty()) {
            this.latestInstantList.add(this.latestInstant);
        } else {
            this.latestInstantList.set(0, this.latestInstant);
        }
        this.latestInstantState.update(this.latestInstantList);
        LOG.info("Update latest instant [{}]", (Object)this.latestInstant);
        this.recordsState.update(this.bufferedRecords);
        LOG.info("Update records state size = [{}]", (Object)this.bufferedRecords.size());
        this.bufferedRecords.clear();
    }

    private String startNewInstant(long checkpointId) {
        String newTime = this.writeClient.startCommit();
        LOG.info("create instant [{}], at checkpoint [{}]", (Object)newTime, (Object)checkpointId);
        return newTime;
    }

    private void doCheck() throws InterruptedException {
        String commitType = this.cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? "commit" : "deltacommit";
        LOG.info("Query latest instant [{}]", (Object)this.latestInstant);
        List<String> rollbackPendingCommits = this.writeClient.getInflightsAndRequestedInstants(commitType);
        for (int tryTimes = 0; tryTimes < this.retryTimes; ++tryTimes) {
            StringBuffer sb = new StringBuffer();
            if (rollbackPendingCommits.contains(this.latestInstant)) {
                rollbackPendingCommits.forEach(x -> sb.append((String)x).append(","));
                LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", new Object[]{this.latestInstant, sb.toString(), tryTimes});
                TimeUnit.SECONDS.sleep(this.retryInterval.intValue());
                rollbackPendingCommits = this.writeClient.getInflightsAndRequestedInstants(commitType);
                continue;
            }
            LOG.warn("Latest transaction [{}] is completed! Completed transaction, try times [{}]", (Object)this.latestInstant, (Object)tryTimes);
            return;
        }
        throw new InterruptedException(String.format("Last instant costs more than %s second, stop task now", this.retryTimes * this.retryInterval));
    }

    private void initTable() throws IOException {
        if (!this.fs.exists(new Path(this.cfg.targetBasePath))) {
            HoodieTableMetaClient.initTableType(new Configuration(this.serializableHadoopConf.get()), this.cfg.targetBasePath, HoodieTableType.valueOf(this.cfg.tableType), this.cfg.targetTableName, "archived", this.cfg.payloadClassName, 1);
            LOG.info("Table initialized");
        } else {
            LOG.info("Table already [{}/{}] exists, do nothing here", (Object)this.cfg.targetBasePath, (Object)this.cfg.targetTableName);
        }
    }

    public void close() throws Exception {
        if (this.writeClient != null) {
            this.writeClient.close();
        }
        if (this.fs != null) {
            this.fs.close();
        }
    }
}

