/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudObjectsSelectorCommon {
    private static final Logger LOG = LoggerFactory.getLogger(CloudObjectsSelectorCommon.class);

    public static MapPartitionsFunction<Row, CloudObjectMetadata> getCloudObjectMetadataPerPartition(String storageUrlSchemePrefix, SerializableConfiguration serializableHadoopConf, boolean checkIfExists) {
        return (MapPartitionsFunction & Serializable)rows -> {
            ArrayList cloudObjectMetadataPerPartition = new ArrayList();
            rows.forEachRemaining(row -> {
                Option<String> filePathUrl = CloudObjectsSelectorCommon.getUrlForFile(row, storageUrlSchemePrefix, serializableHadoopConf, checkIfExists);
                filePathUrl.ifPresent(url2 -> {
                    long size;
                    LOG.info("Adding file: " + url2);
                    Object obj = row.get(2);
                    if (obj instanceof String) {
                        size = Long.parseLong((String)obj);
                    } else if (obj instanceof Integer) {
                        size = ((Integer)obj).longValue();
                    } else if (obj instanceof Long) {
                        size = (Long)obj;
                    } else {
                        throw new HoodieIOException("unexpected object size's type in Cloud storage events: " + obj.getClass());
                    }
                    cloudObjectMetadataPerPartition.add(new CloudObjectMetadata((String)url2, size));
                });
            });
            return cloudObjectMetadataPerPartition.iterator();
        };
    }

    private static Option<String> getUrlForFile(Row row, String storageUrlSchemePrefix, SerializableConfiguration serializableConfiguration, boolean checkIfExists) {
        Configuration configuration = serializableConfiguration.newCopy();
        String bucket = row.getString(0);
        String filePath = storageUrlSchemePrefix + bucket + "/" + row.getString(1);
        try {
            String filePathUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name());
            if (!checkIfExists) {
                return Option.of(filePathUrl);
            }
            boolean exists = CloudObjectsSelectorCommon.checkIfFileExists(storageUrlSchemePrefix, bucket, filePathUrl, configuration);
            return exists ? Option.of(filePathUrl) : Option.empty();
        }
        catch (Exception exception) {
            LOG.warn(String.format("Failed to generate path to cloud file %s", filePath), (Throwable)exception);
            throw new HoodieException(String.format("Failed to generate path to cloud file %s", filePath), exception);
        }
    }

    private static boolean checkIfFileExists(String storageUrlSchemePrefix, String bucket, String filePathUrl, Configuration configuration) {
        try {
            FileSystem fs = FSUtils.getFs(storageUrlSchemePrefix + bucket, configuration);
            return fs.exists(new Path(filePathUrl));
        }
        catch (IOException ioe) {
            String errMsg = String.format("Error while checking path exists for %s ", filePathUrl);
            LOG.error(errMsg, (Throwable)ioe);
            throw new HoodieIOException(errMsg, ioe);
        }
    }

    public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String fileFormat, Option<SchemaProvider> schemaProviderOption) {
        Schema sourceSchema;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Extracted distinct files " + cloudObjectMetadata.size() + " and some samples " + cloudObjectMetadata.stream().map(CloudObjectMetadata::getPath).limit(10L).collect(Collectors.toList()));
        }
        if (CollectionUtils.isNullOrEmpty(cloudObjectMetadata)) {
            return Option.empty();
        }
        DataFrameReader reader = spark.read().format(fileFormat);
        String datasourceOpts = ConfigUtils.getStringWithAltKeys(props, CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
        if (schemaProviderOption.isPresent() && (sourceSchema = schemaProviderOption.get().getSourceSchema()) != null && !sourceSchema.equals((Object)InputBatch.NULL_SCHEMA)) {
            reader = reader.schema(AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema));
        }
        if (StringUtils.isNullOrEmpty(datasourceOpts)) {
            datasourceOpts = ConfigUtils.getStringWithAltKeys(props, S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
        }
        if (StringUtils.nonEmpty(datasourceOpts)) {
            ObjectMapper mapper = new ObjectMapper();
            Map sparkOptionsMap = null;
            try {
                sparkOptionsMap = (Map)mapper.readValue(datasourceOpts, Map.class);
            }
            catch (IOException e) {
                throw new HoodieException(String.format("Failed to parse sparkOptions: %s", datasourceOpts), e);
            }
            LOG.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
            reader = reader.options(sparkOptionsMap);
        }
        ArrayList<String> paths = new ArrayList<String>();
        long totalSize = 0L;
        for (CloudObjectMetadata o : cloudObjectMetadata) {
            paths.add(o.getPath());
            totalSize += o.getSize();
        }
        totalSize = (long)((double)totalSize * 1.1);
        long parquetMaxFileSize = props.getLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), Long.parseLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue()));
        int numPartitions = (int)Math.max(totalSize / parquetMaxFileSize, 1L);
        boolean isCommaSeparatedPathFormat = props.getBoolean("hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format", false);
        Dataset dataset = isCommaSeparatedPathFormat ? reader.load(String.join((CharSequence)",", paths)) : reader.load(paths.toArray(new String[cloudObjectMetadata.size()]));
        dataset = dataset.coalesce(numPartitions);
        if (ConfigUtils.containsConfigProperty(props, CloudSourceConfig.PATH_BASED_PARTITION_FIELDS)) {
            String[] partitionKeysToAdd;
            for (String partitionKey : partitionKeysToAdd = ConfigUtils.getStringWithAltKeys(props, CloudSourceConfig.PATH_BASED_PARTITION_FIELDS).split(",")) {
                String partitionPathPattern = String.format("%s=", partitionKey);
                LOG.info(String.format("Adding column %s to dataset", partitionKey));
                dataset = dataset.withColumn(partitionKey, functions.split((Column)functions.split((Column)functions.input_file_name(), (String)partitionPathPattern).getItem((Object)1), (String)"/").getItem((Object)0));
            }
        }
        return Option.of(dataset);
    }

    public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String fileFormat) {
        return CloudObjectsSelectorCommon.loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, Option.empty());
    }
}

