package org.apache.hudi.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.format.mor.InstantRange;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/source/StreamReadMonitoringFunction.class */
public class StreamReadMonitoringFunction extends RichSourceFunction<MergeOnReadInputSplit> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
    private static final long serialVersionUID = 1;
    private final Path path;
    private final long interval;
    private transient Object checkpointLock;
    private volatile boolean isRunning = true;
    private String issuedInstant;
    private transient ListState<String> instantState;
    private final Configuration conf;
    private transient org.apache.hadoop.conf.Configuration hadoopConf;
    private final HoodieTableMetaClient metaClient;
    private final long maxCompactionMemoryInBytes;

    public StreamReadMonitoringFunction(Configuration configuration, Path path, HoodieTableMetaClient hoodieTableMetaClient, long j) {
        this.conf = configuration;
        this.path = path;
        this.metaClient = hoodieTableMetaClient;
        this.interval = configuration.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
        this.maxCompactionMemoryInBytes = j;
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        ValidationUtils.checkState(this.instantState == null, "The " + getClass().getSimpleName() + " has already been initialized.");
        this.instantState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("file-monitoring-state", StringSerializer.INSTANCE));
        if (functionInitializationContext.isRestored()) {
            LOG.info("Restoring state for the class {} with table {} and base path {}.", new Object[]{getClass().getSimpleName(), this.conf.getString(FlinkOptions.TABLE_NAME), this.path});
            ArrayList arrayList = new ArrayList();
            Iterator it = ((Iterable) this.instantState.get()).iterator();
            while (it.hasNext()) {
                arrayList.add((String) it.next());
            }
            ValidationUtils.checkArgument(arrayList.size() <= 1, getClass().getSimpleName() + " retrieved invalid state.");
            if (arrayList.size() == 1 && this.issuedInstant != null) {
                throw new IllegalArgumentException("The " + getClass().getSimpleName() + " has already restored from a previous Flink version.");
            }
            if (arrayList.size() == 1) {
                this.issuedInstant = (String) arrayList.get(0);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} retrieved a issued instant of time {} for table {} with path {}.", new Object[]{getClass().getSimpleName(), this.issuedInstant, this.conf.get(FlinkOptions.TABLE_NAME), this.path});
                }
            }
        }
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.hadoopConf = StreamerUtil.getHadoopConf();
    }

    public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> sourceContext) throws Exception {
        this.checkpointLock = sourceContext.getCheckpointLock();
        while (this.isRunning) {
            synchronized (this.checkpointLock) {
                monitorDirAndForwardSplits(sourceContext);
            }
            TimeUnit.SECONDS.sleep(this.interval);
        }
    }

    @VisibleForTesting
    public void monitorDirAndForwardSplits(SourceFunction.SourceContext<MergeOnReadInputSplit> sourceContext) {
        this.metaClient.reloadActiveTimeline();
        HoodieTimeline filterCompletedInstants = this.metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
        if (filterCompletedInstants.empty()) {
            LOG.warn("No splits found for the table under path " + this.path);
            return;
        }
        List<HoodieInstant> uncompactedInstants = getUncompactedInstants(filterCompletedInstants, this.issuedInstant);
        HoodieInstant hoodieInstant = uncompactedInstants.size() == 0 ? null : uncompactedInstants.get(uncompactedInstants.size() - 1);
        if (hoodieInstant == null) {
            LOG.info("No new instant found for the table under path " + this.path + ", skip reading");
            return;
        }
        InstantRange instantRange = this.issuedInstant != null ? InstantRange.getInstance(this.issuedInstant, hoodieInstant.getTimestamp(), InstantRange.RangeType.OPEN_CLOSE) : this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent() ? InstantRange.getInstance(this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT), hoodieInstant.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE) : null;
        List<HoodieCommitMetadata> list = (List) uncompactedInstants.stream().map(hoodieInstant2 -> {
            return getCommitMetadata(hoodieInstant2, filterCompletedInstants);
        }).collect(Collectors.toList());
        Set<String> writePartitionPaths = getWritePartitionPaths(list);
        FileStatus[] writePathsOfInstants = getWritePathsOfInstants(list);
        if (writePathsOfInstants.length == 0) {
            throw new HoodieException("No files found for reading in user provided path.");
        }
        HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(this.metaClient, filterCompletedInstants, writePathsOfInstants);
        String timestamp = hoodieInstant.getTimestamp();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        String string = this.conf.getString(FlinkOptions.MERGE_TYPE);
        InstantRange instantRange2 = instantRange;
        Iterator it = ((List) writePartitionPaths.stream().map(str -> {
            return (List) hoodieTableFileSystemView.getLatestMergedFileSlicesBeforeOrOn(str, timestamp).map(fileSlice -> {
                return new MergeOnReadInputSplit(atomicInteger.getAndAdd(1), null, Option.ofNullable(fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(hoodieLogFile -> {
                    return hoodieLogFile.getPath().toString();
                }).collect(Collectors.toList())), timestamp, this.metaClient.getBasePath(), this.maxCompactionMemoryInBytes, string, instantRange2);
            }).collect(Collectors.toList());
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList())).iterator();
        while (it.hasNext()) {
            sourceContext.collect((MergeOnReadInputSplit) it.next());
        }
        this.issuedInstant = timestamp;
    }

    public void close() throws Exception {
        super.close();
        if (this.checkpointLock != null) {
            synchronized (this.checkpointLock) {
                this.issuedInstant = null;
                this.isRunning = false;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Closed File Monitoring Source for path: " + this.path + ".");
        }
    }

    public void cancel() {
        if (this.checkpointLock == null) {
            this.issuedInstant = null;
            this.isRunning = false;
        } else {
            synchronized (this.checkpointLock) {
                this.issuedInstant = null;
                this.isRunning = false;
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.instantState.clear();
        if (this.issuedInstant != null) {
            this.instantState.add(this.issuedInstant);
        }
    }

    private List<HoodieInstant> getUncompactedInstants(HoodieTimeline hoodieTimeline, String str) {
        if (str != null) {
            return (List) hoodieTimeline.getInstants().filter(hoodieInstant -> {
                return !hoodieInstant.getAction().equals("compaction");
            }).filter(hoodieInstant2 -> {
                return HoodieTimeline.compareTimestamps(hoodieInstant2.getTimestamp(), HoodieTimeline.GREATER_THAN, str);
            }).collect(Collectors.toList());
        }
        if (!this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
            return (List) hoodieTimeline.getInstants().filter(hoodieInstant3 -> {
                return !hoodieInstant3.getAction().equals("compaction");
            }).collect(Collectors.toList());
        }
        String str2 = (String) this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
        return (List) hoodieTimeline.getInstants().filter(hoodieInstant4 -> {
            return !hoodieInstant4.getAction().equals("compaction");
        }).filter(hoodieInstant5 -> {
            return HoodieTimeline.compareTimestamps(hoodieInstant5.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, str2);
        }).collect(Collectors.toList());
    }

    private Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> list) {
        return (Set) list.stream().map((v0) -> {
            return v0.getWritePartitionPaths();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
    }

    private FileStatus[] getWritePathsOfInstants(List<HoodieCommitMetadata> list) {
        FileSystem fs = FSUtils.getFs(this.path.toString(), this.hadoopConf);
        return (FileStatus[]) list.stream().map(hoodieCommitMetadata -> {
            return getWritePathsOfInstant(hoodieCommitMetadata, fs);
        }).flatMap((v0) -> {
            return v0.stream();
        }).toArray(i -> {
            return new FileStatus[i];
        });
    }

    private List<FileStatus> getWritePathsOfInstant(HoodieCommitMetadata hoodieCommitMetadata, FileSystem fileSystem) {
        return (List) hoodieCommitMetadata.getFileIdAndFullPaths(this.path.toString()).values().stream().map(str -> {
            try {
                return fileSystem.getFileStatus(new org.apache.hadoop.fs.Path(str));
            } catch (IOException e) {
                LOG.error("Get write status of path: {} error", str);
                throw new HoodieException(e);
            }
        }).collect(Collectors.toList());
    }

    private HoodieCommitMetadata getCommitMetadata(HoodieInstant hoodieInstant, HoodieTimeline hoodieTimeline) {
        try {
            return (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) hoodieTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
        } catch (IOException e) {
            LOG.error("Get write metadata for table {} with instant {} and path: {} error", new Object[]{this.conf.getString(FlinkOptions.TABLE_NAME), hoodieInstant.getTimestamp(), this.path});
            throw new HoodieException(e);
        }
    }
}
