/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.io.Serializable;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
import org.apache.hudi.utilities.config.DatePartitionPathSelectorConfig;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatePartitionPathSelector
extends DFSPathSelector {
    private static final Logger LOG = LoggerFactory.getLogger(DatePartitionPathSelector.class);
    private final String dateFormat;
    private final int datePartitionDepth;
    private final int numPrevDaysToList;
    private final int partitionsListParallelism;

    public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf) {
        super(props, hadoopConf);
        this.dateFormat = ConfigUtils.getStringWithAltKeys((Properties)props, DatePartitionPathSelectorConfig.DATE_FORMAT, true);
        this.datePartitionDepth = ConfigUtils.getIntWithAltKeys(props, DatePartitionPathSelectorConfig.DATE_PARTITION_DEPTH);
        this.numPrevDaysToList = ConfigUtils.getIntWithAltKeys(props, DatePartitionPathSelectorConfig.LOOKBACK_DAYS);
        this.partitionsListParallelism = ConfigUtils.getIntWithAltKeys(props, DatePartitionPathSelectorConfig.PARTITIONS_LIST_PARALLELISM);
    }

    @Override
    public Pair<Option<String>, Checkpoint> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext, Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        FileStatus f2;
        LocalDate currentDate = LocalDate.parse(ConfigUtils.getStringWithAltKeys((Properties)this.props, DatePartitionPathSelectorConfig.CURRENT_DATE, LocalDate.now().toString()));
        LOG.info("Root path => " + ConfigUtils.getStringWithAltKeys(this.props, DFSPathSelectorConfig.ROOT_INPUT_PATH) + " source limit => " + sourceLimit + " depth of day partition => " + this.datePartitionDepth + " num prev days to list => " + this.numPrevDaysToList + " from current date => " + currentDate);
        long lastCheckpointTime = lastCheckpoint.map(e -> Long.parseLong(e.getCheckpointKey())).orElse(Long.MIN_VALUE);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
        HadoopStorageConfiguration storageConf = new HadoopStorageConfiguration(this.fs.getConf());
        List<String> prunedPartitionPaths = this.pruneDatePartitionPaths(context, this.fs, ConfigUtils.getStringWithAltKeys(this.props, DFSPathSelectorConfig.ROOT_INPUT_PATH), currentDate);
        List eligibleFiles = context.flatMap(prunedPartitionPaths, path -> {
            FileSystem fs = HadoopFSUtils.getFs(path, storageConf);
            return this.listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream();
        }, this.partitionsListParallelism);
        List sortedEligibleFiles = eligibleFiles.stream().sorted(Comparator.comparingLong(FileStatus::getModificationTime)).collect(Collectors.toList());
        long currentBytes = 0L;
        long newCheckpointTime = lastCheckpointTime;
        ArrayList<FileStatus> filteredFiles = new ArrayList<FileStatus>();
        Iterator iterator2 = sortedEligibleFiles.iterator();
        while (iterator2.hasNext() && (currentBytes + (f2 = (FileStatus)iterator2.next()).getLen() < sourceLimit || f2.getModificationTime() <= newCheckpointTime)) {
            newCheckpointTime = f2.getModificationTime();
            currentBytes += f2.getLen();
            filteredFiles.add(f2);
        }
        if (filteredFiles.isEmpty()) {
            return new ImmutablePair<Option<String>, Checkpoint>(Option.empty(), new StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
        }
        String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
        return new ImmutablePair<Option<String>, Checkpoint>(Option.ofNullable(pathStr), new StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
    }

    public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath, LocalDate currentDate) {
        List<String> partitionPaths = new ArrayList<String>();
        partitionPaths.add(rootPath);
        if (this.datePartitionDepth <= 0) {
            return partitionPaths;
        }
        HadoopStorageConfiguration storageConf = new HadoopStorageConfiguration(fs.getConf());
        for (int i = 0; i < this.datePartitionDepth; ++i) {
            partitionPaths = context.flatMap(partitionPaths, path -> {
                Path subDir = new Path(path);
                FileSystem fileSystem2 = HadoopFSUtils.getFs(subDir, storageConf);
                FileStatus[] statuses = fileSystem2.listStatus(subDir, file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith((String)pfx)));
                ArrayList<String> res = new ArrayList<String>();
                for (FileStatus status : statuses) {
                    res.add(status.getPath().toString());
                }
                return res.stream();
            }, this.partitionsListParallelism);
        }
        return context.getJavaSparkContext().parallelize(partitionPaths, this.partitionsListParallelism).filter((Function & Serializable)s -> {
            LocalDate partitionDate;
            LocalDate fromDate = currentDate.minusDays(this.numPrevDaysToList);
            String[] splits = s.split("/");
            String datePartition = splits[splits.length - 1];
            DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(this.dateFormat);
            if (datePartition.contains("=")) {
                String[] moreSplit = datePartition.split("=");
                ValidationUtils.checkArgument(moreSplit.length == 2, "Partition Field (" + datePartition + ") not in expected format");
                partitionDate = LocalDate.parse(moreSplit[1], dateFormatter);
            } else {
                partitionDate = LocalDate.parse(datePartition, dateFormatter);
            }
            return !(!partitionDate.isEqual(fromDate) && !partitionDate.isAfter(fromDate) || !partitionDate.isEqual(currentDate) && !partitionDate.isBefore(currentDate));
        }).collect();
    }

    public static class Config {
        @Deprecated
        public static final String DATE_FORMAT = DatePartitionPathSelectorConfig.DATE_FORMAT.key();
        @Deprecated
        public static final String DEFAULT_DATE_FORMAT = DatePartitionPathSelectorConfig.DATE_FORMAT.defaultValue();
        @Deprecated
        public static final String DATE_PARTITION_DEPTH = DatePartitionPathSelectorConfig.DATE_PARTITION_DEPTH.key();
        @Deprecated
        public static final int DEFAULT_DATE_PARTITION_DEPTH = DatePartitionPathSelectorConfig.DATE_PARTITION_DEPTH.defaultValue();
        @Deprecated
        public static final String LOOKBACK_DAYS = DatePartitionPathSelectorConfig.LOOKBACK_DAYS.key();
        @Deprecated
        public static final int DEFAULT_LOOKBACK_DAYS = DatePartitionPathSelectorConfig.LOOKBACK_DAYS.defaultValue();
        @Deprecated
        public static final String CURRENT_DATE = DatePartitionPathSelectorConfig.CURRENT_DATE.key();
        @Deprecated
        public static final String PARTITIONS_LIST_PARALLELISM = DatePartitionPathSelectorConfig.PARTITIONS_LIST_PARALLELISM.key();
        @Deprecated
        public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = DatePartitionPathSelectorConfig.PARTITIONS_LIST_PARALLELISM.defaultValue();
    }
}

