/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DFSPathSelector
implements Serializable {
    protected static volatile Logger log = LoggerFactory.getLogger(DFSPathSelector.class);
    protected static final List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_");
    protected final transient FileSystem fs;
    protected final TypedProperties props;

    public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
        ConfigUtils.checkRequiredConfigProperties(props, Collections.singletonList(DFSPathSelectorConfig.ROOT_INPUT_PATH));
        this.props = props;
        this.fs = HadoopFSUtils.getFs(ConfigUtils.getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH), hadoopConf);
    }

    public static DFSPathSelector createSourceSelector(TypedProperties props, Configuration conf) {
        String sourceSelectorClass = ConfigUtils.getStringWithAltKeys((Properties)props, DFSPathSelectorConfig.SOURCE_INPUT_SELECTOR, DFSPathSelector.class.getName());
        try {
            DFSPathSelector selector = (DFSPathSelector)ReflectionUtils.loadClass(sourceSelectorClass, new Class[]{TypedProperties.class, Configuration.class}, new Object[]{props, conf});
            log.info("Using path selector " + selector.getClass().getName());
            return selector;
        }
        catch (Exception e) {
            throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e);
        }
    }

    public Pair<Option<String>, Checkpoint> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext, Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        return this.getNextFilePathsAndMaxModificationTime(lastCheckpoint, sourceLimit);
    }

    @Deprecated
    public Pair<Option<String>, Checkpoint> getNextFilePathsAndMaxModificationTime(Option<Checkpoint> lastCheckpointStr, long sourceLimit) {
        try {
            log.info("Root path => " + ConfigUtils.getStringWithAltKeys(this.props, DFSPathSelectorConfig.ROOT_INPUT_PATH) + " source limit => " + sourceLimit);
            long lastCheckpointTime = lastCheckpointStr.map(e -> Long.parseLong(e.getCheckpointKey())).orElse(Long.MIN_VALUE);
            List<FileStatus> eligibleFiles = this.listEligibleFiles(this.fs, new Path(ConfigUtils.getStringWithAltKeys(this.props, DFSPathSelectorConfig.ROOT_INPUT_PATH)), lastCheckpointTime);
            eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
            long currentBytes = 0L;
            long newCheckpointTime = lastCheckpointTime;
            ArrayList<FileStatus> filteredFiles = new ArrayList<FileStatus>();
            for (FileStatus f2 : eligibleFiles) {
                if (currentBytes + f2.getLen() >= sourceLimit && f2.getModificationTime() > newCheckpointTime) break;
                newCheckpointTime = f2.getModificationTime();
                currentBytes += f2.getLen();
                filteredFiles.add(f2);
            }
            if (filteredFiles.isEmpty()) {
                return new ImmutablePair<Option<String>, Checkpoint>(Option.empty(), new StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
            }
            String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
            return new ImmutablePair<Option<String>, Checkpoint>(Option.ofNullable(pathStr), new StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
        }
    }

    protected List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException {
        FileStatus[] statuses = fs.listStatus(path, file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith((String)pfx)));
        ArrayList<FileStatus> res = new ArrayList<FileStatus>();
        for (FileStatus status : statuses) {
            if (status.isDirectory()) {
                if (status.isSymlink()) continue;
                res.addAll(this.listEligibleFiles(fs, status.getPath(), lastCheckpointTime));
                continue;
            }
            if (status.getModificationTime() <= lastCheckpointTime || status.getLen() <= 0L) continue;
            res.add(status);
        }
        return res;
    }

    public static class Config {
        @Deprecated
        public static final String ROOT_INPUT_PATH_PROP = DFSPathSelectorConfig.ROOT_INPUT_PATH.key();
        @Deprecated
        public static final String SOURCE_INPUT_SELECTOR = DFSPathSelectorConfig.SOURCE_INPUT_SELECTOR.key();
    }
}

