/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class S3EventsHoodieIncrSource
extends HoodieIncrSource {
    private static final Logger LOG = LogManager.getLogger(S3EventsHoodieIncrSource.class);

    public S3EventsHoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(props, sparkContext, sparkSession, schemaProvider);
    }

    private DataFrameReader getDataFrameReader(String fileFormat) {
        DataFrameReader dataFrameReader = this.sparkSession.read().format(fileFormat);
        if (!StringUtils.isNullOrEmpty(this.props.getString("hoodie.deltastreamer.source.s3incr.spark.datasource.options", null))) {
            ObjectMapper mapper = new ObjectMapper();
            Map sparkOptionsMap = null;
            try {
                sparkOptionsMap = (Map)mapper.readValue(this.props.getString("hoodie.deltastreamer.source.s3incr.spark.datasource.options"), Map.class);
            }
            catch (IOException e) {
                throw new HoodieException(String.format("Failed to parse sparkOptions: %s", this.props.getString("hoodie.deltastreamer.source.s3incr.spark.datasource.options")), e);
            }
            LOG.info((Object)String.format("sparkOptions loaded: %s", sparkOptionsMap));
            dataFrameReader = dataFrameReader.options(sparkOptionsMap);
        }
        return dataFrameReader;
    }

    @Override
    public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
        IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy;
        DataSourceUtils.checkRequiredProperties((TypedProperties)this.props, Collections.singletonList("hoodie.deltastreamer.source.hoodieincr.path"));
        String srcPath = this.props.getString("hoodie.deltastreamer.source.hoodieincr.path");
        int numInstantsPerFetch = this.props.getInteger("hoodie.deltastreamer.source.hoodieincr.num_instants", HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH);
        boolean readLatestOnMissingCkpt = this.props.getBoolean("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt", HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
        IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy2 = missingCheckpointStrategy = this.props.containsKey("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy") ? IncrSourceHelper.MissingCheckpointStrategy.valueOf(this.props.getString("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy")) : null;
        if (readLatestOnMissingCkpt) {
            missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
        }
        String fileFormat = this.props.getString("hoodie.deltastreamer.source.hoodieincr.file.format", "parquet");
        Option<String> beginInstant = lastCkptStr.isPresent() ? (lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr) : Option.empty();
        Pair<String, Pair<String, String>> queryTypeAndInstantEndpts = IncrSourceHelper.calculateBeginAndEndInstants(this.sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy);
        if (queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue())) {
            LOG.warn((Object)("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getValue().getKey()));
            return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getValue().getKey());
        }
        Dataset source = null;
        source = queryTypeAndInstantEndpts.getKey().equals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()) ? this.sparkSession.read().format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryTypeAndInstantEndpts.getRight().getLeft()).option(DataSourceReadOptions.END_INSTANTTIME().key(), queryTypeAndInstantEndpts.getRight().getRight()).load(srcPath) : this.sparkSession.read().format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath).filter(String.format("%s > '%s'", "_hoodie_commit_time", queryTypeAndInstantEndpts.getRight().getLeft())).filter(String.format("%s <= '%s'", "_hoodie_commit_time", queryTypeAndInstantEndpts.getRight().getRight()));
        if (source.isEmpty()) {
            return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight());
        }
        String filter = "s3.object.size > 0";
        if (!StringUtils.isNullOrEmpty(this.props.getString("hoodie.deltastreamer.source.s3incr.key.prefix", null))) {
            filter = filter + " and s3.object.key like '" + this.props.getString("hoodie.deltastreamer.source.s3incr.key.prefix") + "%'";
        }
        if (!StringUtils.isNullOrEmpty(this.props.getString("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", null))) {
            filter = filter + " and s3.object.key not like '" + this.props.getString("hoodie.deltastreamer.source.s3incr.ignore.key.prefix") + "%'";
        }
        if (!StringUtils.isNullOrEmpty(this.props.getString("hoodie.deltastreamer.source.s3incr.ignore.key.substring", null))) {
            filter = filter + " and s3.object.key not like '%" + this.props.getString("hoodie.deltastreamer.source.s3incr.ignore.key.substring") + "%'";
        }
        filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
        String s3FS = this.props.getString("hoodie.deltastreamer.source.s3incr.fs.prefix", "s3").toLowerCase();
        String s3Prefix = s3FS + "://";
        boolean checkExists = this.props.getBoolean("hoodie.deltastreamer.source.s3incr.check.file.exists", Config.DEFAULT_ENABLE_EXISTS_CHECK);
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(this.sparkContext.hadoopConfiguration());
        List cloudFiles = source.filter(filter).select("s3.bucket.name", new String[]{"s3.object.key"}).distinct().mapPartitions((MapPartitionsFunction & Serializable)fileListIterator -> {
            ArrayList cloudFilesPerPartition = new ArrayList();
            Configuration configuration = serializableConfiguration.newCopy();
            fileListIterator.forEachRemaining(row -> {
                String bucket = row.getString(0);
                String filePath = s3Prefix + bucket + "/" + row.getString(1);
                String decodeUrl = null;
                try {
                    decodeUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name());
                    if (checkExists) {
                        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, configuration);
                        if (fs.exists(new Path(decodeUrl))) {
                            cloudFilesPerPartition.add(decodeUrl);
                        }
                    } else {
                        cloudFilesPerPartition.add(decodeUrl);
                    }
                }
                catch (IOException e) {
                    LOG.error((Object)String.format("Error while checking path exists for %s ", decodeUrl), (Throwable)e);
                    throw new HoodieIOException(String.format("Error while checking path exists for %s ", decodeUrl), e);
                }
                catch (Throwable e) {
                    LOG.warn((Object)"Failed to add cloud file ", e);
                    throw new HoodieException("Failed to add cloud file", e);
                }
            });
            return cloudFilesPerPartition.iterator();
        }, Encoders.STRING()).collectAsList();
        Option<Object> dataset = Option.empty();
        if (!cloudFiles.isEmpty()) {
            DataFrameReader dataFrameReader = this.getDataFrameReader(fileFormat);
            dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0])));
        }
        LOG.debug((Object)("Extracted distinct files " + cloudFiles.size() + " and some samples " + cloudFiles.stream().limit(10L).collect(Collectors.toList())));
        return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight());
    }

    static class Config {
        static final String ENABLE_EXISTS_CHECK = "hoodie.deltastreamer.source.s3incr.check.file.exists";
        static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = false;
        static final String S3_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.key.prefix";
        static final String S3_FS_PREFIX = "hoodie.deltastreamer.source.s3incr.fs.prefix";
        static final String S3_IGNORE_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.ignore.key.prefix";
        static final String S3_IGNORE_KEY_SUBSTRING = "hoodie.deltastreamer.source.s3incr.ignore.key.substring";
        static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";

        Config() {
        }
    }
}

