/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.util.Arrays;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.types.StructType;

public class CsvDFSSource
extends RowSource {
    private static final long serialVersionUID = 1L;
    protected static final String CSV_SRC_CONFIG_PREFIX = "hoodie.deltastreamer.csv.";
    protected static final List<String> CSV_CONFIG_KEYS = Arrays.asList("sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", "comment", "header", "enforceSchema", "inferSchema", "samplingRatio", "ignoreLeadingWhiteSpace", "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", "positiveInf", "negativeInf", "dateFormat", "timestampFormat", "maxColumns", "maxCharsPerColumn", "mode", "columnNameOfCorruptRecord", "multiLine");
    private final transient DFSPathSelector pathSelector;
    private final StructType sourceSchema;

    public CsvDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(props, sparkContext, sparkSession, schemaProvider);
        this.pathSelector = DFSPathSelector.createSourceSelector(props, sparkContext.hadoopConfiguration());
        this.sourceSchema = schemaProvider != null ? (StructType)SchemaConverters.toSqlType((Schema)schemaProvider.getSourceSchema()).dataType() : null;
    }

    @Override
    protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
        Pair<Option<String>, String> selPathsWithMaxModificationTime = this.pathSelector.getNextFilePathsAndMaxModificationTime(this.sparkContext, lastCkptStr, sourceLimit);
        return Pair.of(this.fromFiles(selPathsWithMaxModificationTime.getLeft()), selPathsWithMaxModificationTime.getRight());
    }

    private Option<Dataset<Row>> fromFiles(Option<String> pathStr) {
        if (pathStr.isPresent()) {
            DataFrameReader dataFrameReader = this.sparkSession.read().format("csv");
            CSV_CONFIG_KEYS.forEach(optionKey -> {
                String configPropName = CSV_SRC_CONFIG_PREFIX + optionKey;
                String value = this.props.getString(configPropName, null);
                if (value != null) {
                    dataFrameReader.option(optionKey, value);
                }
            });
            if (this.sourceSchema != null) {
                dataFrameReader.schema(this.sourceSchema);
            }
            dataFrameReader.option("inferSchema", Boolean.toString(this.sourceSchema == null));
            return Option.of(dataFrameReader.load(pathStr.get().split(",")));
        }
        return Option.empty();
    }
}

