package org.apache.hudi.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/InstantGenerateOperator.class */
public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord> implements OneInputStreamOperator<HoodieRecord, HoodieRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
    public static final String NAME = "InstantGenerateOperator";
    private FlinkStreamerConfig cfg;
    private HoodieFlinkWriteClient writeClient;
    private SerializableConfiguration serializableHadoopConf;
    private transient FileSystem fs;
    private transient ListState<String> latestInstantState;
    private Integer retryTimes;
    private Integer retryInterval;
    private static final String DELIMITER = "_";
    private static final String INSTANT_MARKER_FOLDER_NAME = ".instant_marker";
    private StreamingRuntimeContext runtimeContext;
    private int indexOfThisSubtask;
    private String latestInstant = "";
    private List<String> latestInstantList = new ArrayList(1);
    private transient boolean isMain = false;
    private AtomicLong recordCounter = new AtomicLong(0);

    public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
        if (streamRecord.getValue() != null) {
            this.output.collect(streamRecord);
            this.recordCounter.incrementAndGet();
        }
    }

    public void open() throws Exception {
        super.open();
        this.cfg = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        this.retryTimes = Integer.valueOf(this.cfg.instantRetryTimes);
        this.retryInterval = Integer.valueOf(this.cfg.instantRetryInterval);
        this.serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
        this.fs = FSUtils.getFs(this.cfg.targetBasePath, this.serializableHadoopConf.get());
        if (this.isMain) {
            this.writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier((RuntimeContext) null)), StreamerUtil.getHoodieClientConfig(this.cfg));
            StreamerUtil.initTableIfNotExists(FlinkOptions.fromStreamerConfig(this.cfg));
            createInstantMarkerDir();
        }
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        super.prepareSnapshotPreBarrier(j);
        String format = String.format("%d%s%d%s%d", Integer.valueOf(this.indexOfThisSubtask), DELIMITER, Long.valueOf(j), DELIMITER, Long.valueOf(this.recordCounter.get()));
        this.fs.create(generateCurrentMakerFilePath(format), true);
        LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", new Object[]{Integer.valueOf(this.indexOfThisSubtask), Long.valueOf(j), format});
        if (this.isMain) {
            if (!StringUtils.isNullOrEmpty(this.latestInstant)) {
                doCheck();
                this.latestInstant = "";
            }
            if (checkReceivedData(j)) {
                this.latestInstant = startNewInstant(j);
            }
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.runtimeContext = getRuntimeContext();
        this.indexOfThisSubtask = this.runtimeContext.getIndexOfThisSubtask();
        this.isMain = this.indexOfThisSubtask == 0;
        if (this.isMain) {
            this.latestInstantState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("latestInstant", String.class));
            if (stateInitializationContext.isRestored()) {
                ((Iterable) this.latestInstantState.get()).iterator().forEachRemaining(str -> {
                    this.latestInstant = str;
                });
                LOG.info("Restoring the latest instant [{}] from the state", this.latestInstant);
            }
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        long checkpointId = stateSnapshotContext.getCheckpointId();
        long j = this.recordCounter.get();
        if (this.isMain) {
            LOG.info("Update latest instant [{}] records size [{}] checkpointId [{}]", new Object[]{this.latestInstant, Long.valueOf(j), Long.valueOf(checkpointId)});
            if (this.latestInstantList.isEmpty()) {
                this.latestInstantList.add(this.latestInstant);
            } else {
                this.latestInstantList.set(0, this.latestInstant);
            }
            this.latestInstantState.update(this.latestInstantList);
        } else {
            LOG.info("Task instance {} received {} records in checkpoint [{}]", new Object[]{Integer.valueOf(this.indexOfThisSubtask), Long.valueOf(j), Long.valueOf(checkpointId)});
        }
        this.recordCounter.set(0L);
    }

    private String startNewInstant(long j) {
        String startCommit = this.writeClient.startCommit();
        this.writeClient.transitionRequestedToInflight(this.cfg.tableType, startCommit);
        LOG.info("create instant [{}], at checkpoint [{}]", startCommit, Long.valueOf(j));
        return startCommit;
    }

    private void doCheck() throws InterruptedException {
        String str = this.cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? "commit" : "deltacommit";
        LOG.info("Query latest instant [{}]", this.latestInstant);
        List inflightsAndRequestedInstants = this.writeClient.getInflightsAndRequestedInstants(str);
        int i = 0;
        while (i < this.retryTimes.intValue()) {
            i++;
            StringBuilder sb = new StringBuilder();
            if (!inflightsAndRequestedInstants.contains(this.latestInstant)) {
                LOG.warn("Latest transaction [{}] is completed! Completed transaction, try times [{}]", this.latestInstant, Integer.valueOf(i));
                return;
            }
            inflightsAndRequestedInstants.forEach(str2 -> {
                sb.append(str2).append(",");
            });
            LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", new Object[]{this.latestInstant, sb, Integer.valueOf(i)});
            TimeUnit.SECONDS.sleep(this.retryInterval.intValue());
            inflightsAndRequestedInstants = this.writeClient.getInflightsAndRequestedInstants(str);
        }
        throw new InterruptedException(String.format("Last instant costs more than %s second, stop task now", Integer.valueOf(this.retryTimes.intValue() * this.retryInterval.intValue())));
    }

    public void close() throws Exception {
        if (this.writeClient != null) {
            this.writeClient.close();
        }
        if (this.fs != null) {
            this.fs.close();
        }
    }

    private boolean checkReceivedData(final long j) throws InterruptedException, IOException {
        FileStatus[] listStatus;
        int numberOfParallelSubtasks = this.runtimeContext.getNumberOfParallelSubtasks();
        Path generateCurrentMakerDirPath = generateCurrentMakerDirPath();
        while (true) {
            Thread.sleep(500L);
            listStatus = this.fs.listStatus(generateCurrentMakerDirPath, new PathFilter() { // from class: org.apache.hudi.sink.InstantGenerateOperator.1
                public boolean accept(Path path) {
                    return path.getName().contains(String.format("%s%d%s", InstantGenerateOperator.DELIMITER, Long.valueOf(j), InstantGenerateOperator.DELIMITER));
                }
            });
            if (listStatus != null && listStatus.length == numberOfParallelSubtasks) {
                break;
            }
        }
        boolean z = false;
        int length = listStatus.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            if (Long.parseLong(listStatus[i].getPath().getName().split(DELIMITER)[2]) > 0) {
                z = true;
                break;
            }
            i++;
        }
        cleanMarkerDir(generateCurrentMakerDirPath);
        return z;
    }

    private void createInstantMarkerDir() throws IOException {
        Path path = new Path(new Path(this.cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME), INSTANT_MARKER_FOLDER_NAME);
        if (this.fs.exists(path)) {
            cleanMarkerDir(path);
        } else {
            this.fs.mkdirs(path);
        }
    }

    private void cleanMarkerDir(Path path) throws IOException {
        for (FileStatus fileStatus : this.fs.listStatus(path)) {
            this.fs.delete(fileStatus.getPath(), true);
        }
    }

    private Path generateCurrentMakerDirPath() {
        return new Path(new Path(this.cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME), INSTANT_MARKER_FOLDER_NAME);
    }

    private Path generateCurrentMakerFilePath(String str) {
        return new Path(generateCurrentMakerDirPath(), str);
    }
}
