package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.JobException;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tukaani.xz.common.Util;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.class */
public class ContinuousFileMonitoringFunction<OUT> extends RichSourceFunction<FileInputSplit> implements Checkpointed<Long> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG;
    public static final long MIN_MONITORING_INTERVAL = 100;
    private final String path;
    private final int readerParallelism;
    private FileInputFormat<OUT> format;
    private final long interval;
    private final FileProcessingMode watchType;
    private Long globalModificationTime;
    private FilePathFilter pathFilter;
    private transient Object checkpointLock;
    private volatile boolean isRunning = true;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ContinuousFileMonitoringFunction(FileInputFormat<OUT> fileInputFormat, String str, FilePathFilter filePathFilter, FileProcessingMode fileProcessingMode, int i, long j) {
        if (fileProcessingMode != FileProcessingMode.PROCESS_ONCE && j < 100) {
            throw new IllegalArgumentException("The specified monitoring interval (" + j + " ms) is smaller than the minimum allowed one (100 ms).");
        }
        this.format = (FileInputFormat) Preconditions.checkNotNull(fileInputFormat, "Unspecified File Input Format.");
        this.path = (String) Preconditions.checkNotNull(str, "Unspecified Path.");
        this.pathFilter = (FilePathFilter) Preconditions.checkNotNull(filePathFilter, "Unspecified File Path Filter.");
        this.interval = j;
        this.watchType = fileProcessingMode;
        this.readerParallelism = Math.max(i, 1);
        this.globalModificationTime = Long.MIN_VALUE;
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) throws Exception {
        LOG.info("Opening File Monitoring Source.");
        super.open(configuration);
        this.format.configure(configuration);
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void run(SourceFunction.SourceContext<FileInputSplit> sourceContext) throws Exception {
        FileSystem fileSystem = FileSystem.get(new URI(this.path));
        this.checkpointLock = sourceContext.getCheckpointLock();
        switch (this.watchType) {
            case PROCESS_CONTINUOUSLY:
                break;
            case PROCESS_ONCE:
                synchronized (this.checkpointLock) {
                    monitorDirAndForwardSplits(fileSystem, sourceContext);
                    this.globalModificationTime = Long.valueOf(Util.VLI_MAX);
                    this.isRunning = false;
                }
                return;
            default:
                this.isRunning = false;
                throw new RuntimeException("Unknown WatchType" + this.watchType);
        }
        while (this.isRunning) {
            synchronized (this.checkpointLock) {
                monitorDirAndForwardSplits(fileSystem, sourceContext);
            }
            Thread.sleep(this.interval);
        }
    }

    private void monitorDirAndForwardSplits(FileSystem fileSystem, SourceFunction.SourceContext<FileInputSplit> sourceContext) throws IOException, JobException {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        Iterator<Tuple2<Long, List<FileInputSplit>>> it = getInputSplitSortedOnModTime(fileSystem).iterator();
        while (it.hasNext()) {
            forwardSplits(it.next(), sourceContext);
            it.remove();
        }
    }

    private void forwardSplits(Tuple2<Long, List<FileInputSplit>> tuple2, SourceFunction.SourceContext<FileInputSplit> sourceContext) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        Long l = tuple2.f0;
        Iterator<FileInputSplit> it = tuple2.f1.iterator();
        while (it.hasNext()) {
            processSplit(it.next(), sourceContext);
            it.remove();
        }
        if (l.longValue() >= this.globalModificationTime.longValue()) {
            this.globalModificationTime = l;
        }
    }

    private void processSplit(FileInputSplit fileInputSplit, SourceFunction.SourceContext<FileInputSplit> sourceContext) {
        LOG.info("Forwarding split: " + fileInputSplit);
        sourceContext.collect(fileInputSplit);
    }

    private List<Tuple2<Long, List<FileInputSplit>>> getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException {
        List<FileStatus> listEligibleFiles = listEligibleFiles(fileSystem);
        if (listEligibleFiles.isEmpty()) {
            return new ArrayList();
        }
        Map<Long, List<FileInputSplit>> inputSplits = getInputSplits(listEligibleFiles);
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Long, List<FileInputSplit>> entry : inputSplits.entrySet()) {
            arrayList.add(new Tuple2(entry.getKey(), entry.getValue()));
        }
        Collections.sort(arrayList, new Comparator<Tuple2<Long, List<FileInputSplit>>>() { // from class: org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.1
            @Override // java.util.Comparator
            public int compare(Tuple2<Long, List<FileInputSplit>> tuple2, Tuple2<Long, List<FileInputSplit>> tuple22) {
                return (int) (tuple2.f0.longValue() - tuple22.f0.longValue());
            }
        });
        return arrayList;
    }

    private Map<Long, List<FileInputSplit>> getInputSplits(List<FileStatus> list) throws IOException {
        if (list.isEmpty()) {
            return new HashMap();
        }
        FileInputSplit[] createInputSplits = this.format.createInputSplits(this.readerParallelism);
        HashMap hashMap = new HashMap();
        for (FileInputSplit fileInputSplit : createInputSplits) {
            Iterator<FileStatus> it = list.iterator();
            while (true) {
                if (it.hasNext()) {
                    FileStatus next = it.next();
                    if (next.getPath().equals(fileInputSplit.getPath())) {
                        Long valueOf = Long.valueOf(next.getModificationTime());
                        List list2 = (List) hashMap.get(valueOf);
                        if (list2 == null) {
                            list2 = new LinkedList();
                            hashMap.put(valueOf, list2);
                        }
                        list2.add(fileInputSplit);
                    }
                }
            }
        }
        return hashMap;
    }

    private List<FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
        try {
            FileStatus[] listStatus = fileSystem.listStatus(new Path(this.path));
            if (listStatus == null) {
                LOG.warn("Path does not exist: {}", this.path);
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList();
            for (FileStatus fileStatus : listStatus) {
                if (!shouldIgnore(fileStatus.getPath(), fileStatus.getModificationTime())) {
                    arrayList.add(fileStatus);
                }
            }
            return arrayList;
        } catch (IOException e) {
            return Collections.emptyList();
        }
    }

    private boolean shouldIgnore(Path path, long j) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        boolean z = (this.pathFilter != null && this.pathFilter.filterPath(path)) || j <= this.globalModificationTime.longValue();
        if (z) {
            LOG.debug("Ignoring " + path + ", with mod time= " + j + " and global mod time= " + this.globalModificationTime);
        }
        return z;
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void close() throws Exception {
        super.close();
        synchronized (this.checkpointLock) {
            this.globalModificationTime = Long.valueOf(Util.VLI_MAX);
            this.isRunning = false;
        }
        LOG.info("Closed File Monitoring Source.");
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
        if (this.checkpointLock == null) {
            this.globalModificationTime = Long.valueOf(Util.VLI_MAX);
            this.isRunning = false;
        } else {
            synchronized (this.checkpointLock) {
                this.globalModificationTime = Long.valueOf(Util.VLI_MAX);
                this.isRunning = false;
            }
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.streaming.api.checkpoint.Checkpointed
    /* renamed from: snapshotState */
    public Long mo3109snapshotState(long j, long j2) throws Exception {
        return this.globalModificationTime;
    }

    @Override // org.apache.flink.streaming.api.checkpoint.Checkpointed
    public void restoreState(Long l) throws Exception {
        this.globalModificationTime = l;
    }

    static {
        $assertionsDisabled = !ContinuousFileMonitoringFunction.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) ContinuousFileMonitoringFunction.class);
    }
}
