package org.apache.hudi.utils.source;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
import org.apache.hudi.util.JsonDeserializationFunction;
import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;

/* loaded from: input_file:org/apache/hudi/utils/source/ContinuousFileSource.class */
public class ContinuousFileSource implements ScanTableSource {
    private final ResolvedSchema tableSchema;
    private final Path path;
    private final Configuration conf;

    /* loaded from: input_file:org/apache/hudi/utils/source/ContinuousFileSource$BoundedSourceFunction.class */
    public static class BoundedSourceFunction implements SourceFunction<String>, CheckpointListener {
        private final Path path;
        private List<String> dataBuffer;
        private final int checkpoints;
        private final AtomicInteger currentCP = new AtomicInteger(0);
        private volatile boolean isRunning = true;

        public BoundedSourceFunction(Path path, int i) {
            this.path = path;
            this.checkpoints = i;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            if (this.dataBuffer == null) {
                loadDataBuffer();
            }
            int i = this.currentCP.get();
            boolean z = false;
            while (this.isRunning) {
                int size = this.dataBuffer.size() / this.checkpoints;
                int i2 = size * i;
                synchronized (sourceContext.getCheckpointLock()) {
                    int i3 = i2;
                    while (true) {
                        if (i3 >= i2 + size) {
                            break;
                        }
                        if (i3 >= this.dataBuffer.size()) {
                            z = true;
                            break;
                        } else {
                            sourceContext.collect(this.dataBuffer.get(i3));
                            i3++;
                        }
                    }
                }
                i++;
                while (this.currentCP.get() < i) {
                    synchronized (sourceContext.getCheckpointLock()) {
                        sourceContext.getCheckpointLock().wait(10L);
                    }
                }
                if (z || !this.isRunning) {
                    return;
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        private void loadDataBuffer() {
            try {
                this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri()));
            } catch (IOException e) {
                throw new RuntimeException("Read file " + this.path + " error", e);
            }
        }

        public void notifyCheckpointComplete(long j) {
            this.currentCP.incrementAndGet();
        }
    }

    public ContinuousFileSource(ResolvedSchema resolvedSchema, Path path, Configuration configuration) {
        this.tableSchema = resolvedSchema;
        this.path = path;
        this.conf = configuration;
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        return new DataStreamScanProviderAdapter() { // from class: org.apache.hudi.utils.source.ContinuousFileSource.1
            public boolean isBounded() {
                return false;
            }

            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
                RowType logicalType = ContinuousFileSource.this.tableSchema.toSourceRowDataType().getLogicalType();
                return streamExecutionEnvironment.addSource(new BoundedSourceFunction(ContinuousFileSource.this.path, ContinuousFileSource.this.conf.getInteger(ContinuousFileSourceFactory.CHECKPOINTS))).name("continuous_file_source").setParallelism(1).map(JsonDeserializationFunction.getInstance(logicalType), InternalTypeInfo.of(logicalType));
            }
        };
    }

    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }

    public DynamicTableSource copy() {
        return new ContinuousFileSource(this.tableSchema, this.path, this.conf);
    }

    public String asSummaryString() {
        return "ContinuousFileSource";
    }
}
