/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;

public class DFSPathSelector
implements Serializable {
    protected static volatile Logger log = LogManager.getLogger(DFSPathSelector.class);
    protected static final List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_");
    protected final transient FileSystem fs;
    protected final TypedProperties props;

    public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
        DataSourceUtils.checkRequiredProperties((TypedProperties)props, Collections.singletonList("hoodie.deltastreamer.source.dfs.root"));
        this.props = props;
        this.fs = FSUtils.getFs(props.getString("hoodie.deltastreamer.source.dfs.root"), hadoopConf);
    }

    public static DFSPathSelector createSourceSelector(TypedProperties props, Configuration conf) {
        String sourceSelectorClass = props.getString("hoodie.deltastreamer.source.input.selector", DFSPathSelector.class.getName());
        try {
            DFSPathSelector selector = (DFSPathSelector)ReflectionUtils.loadClass(sourceSelectorClass, new Class[]{TypedProperties.class, Configuration.class}, new Object[]{props, conf});
            log.info((Object)("Using path selector " + selector.getClass().getName()));
            return selector;
        }
        catch (Exception e) {
            throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e);
        }
    }

    public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext, Option<String> lastCheckpointStr, long sourceLimit) {
        return this.getNextFilePathsAndMaxModificationTime(lastCheckpointStr, sourceLimit);
    }

    @Deprecated
    public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr, long sourceLimit) {
        try {
            log.info((Object)("Root path => " + this.props.getString("hoodie.deltastreamer.source.dfs.root") + " source limit => " + sourceLimit));
            long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
            List<FileStatus> eligibleFiles = this.listEligibleFiles(this.fs, new Path(this.props.getString("hoodie.deltastreamer.source.dfs.root")), lastCheckpointTime);
            eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
            long currentBytes = 0L;
            long newCheckpointTime = lastCheckpointTime;
            ArrayList<FileStatus> filteredFiles = new ArrayList<FileStatus>();
            for (FileStatus f2 : eligibleFiles) {
                if (currentBytes + f2.getLen() >= sourceLimit && f2.getModificationTime() > newCheckpointTime) break;
                newCheckpointTime = f2.getModificationTime();
                currentBytes += f2.getLen();
                filteredFiles.add(f2);
            }
            if (filteredFiles.isEmpty()) {
                return new ImmutablePair<Option<String>, String>(Option.empty(), String.valueOf(newCheckpointTime));
            }
            String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
            return new ImmutablePair<Option<String>, String>(Option.ofNullable(pathStr), String.valueOf(newCheckpointTime));
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
        }
    }

    protected List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException {
        FileStatus[] statuses = fs.listStatus(path, file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith((String)pfx)));
        ArrayList<FileStatus> res = new ArrayList<FileStatus>();
        for (FileStatus status : statuses) {
            if (status.isDirectory()) {
                if (status.isSymlink()) continue;
                res.addAll(this.listEligibleFiles(fs, status.getPath(), lastCheckpointTime));
                continue;
            }
            if (status.getModificationTime() <= lastCheckpointTime || status.getLen() <= 0L) continue;
            res.add(status);
        }
        return res;
    }

    public static class Config {
        public static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root";
        public static final String SOURCE_INPUT_SELECTOR = "hoodie.deltastreamer.source.input.selector";
    }
}

