/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.SqlQueryBuilder;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.storage.StorageLevel;

public class JdbcSource
extends RowSource {
    private static final Logger LOG = LogManager.getLogger(JdbcSource.class);
    private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2");
    private static final String URI_JDBC_PREFIX = "jdbc:";

    public JdbcSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(props, sparkContext, sparkSession, schemaProvider);
    }

    private static DataFrameReader validatePropsAndGetDataFrameReader(SparkSession session, TypedProperties properties) throws HoodieException {
        FSDataInputStream passwordFileStream = null;
        try {
            DataFrameReader fileSystem2;
            DataFrameReader dataFrameReader = session.read().format("jdbc");
            dataFrameReader = dataFrameReader.option("url", properties.getString("hoodie.deltastreamer.jdbc.url"));
            dataFrameReader = dataFrameReader.option("user", properties.getString("hoodie.deltastreamer.jdbc.user"));
            dataFrameReader = dataFrameReader.option("driver", properties.getString("hoodie.deltastreamer.jdbc.driver.class"));
            dataFrameReader = dataFrameReader.option("dbtable", properties.getString("hoodie.deltastreamer.jdbc.table.name"));
            if (properties.containsKey("hoodie.deltastreamer.jdbc.password")) {
                LOG.info((Object)"Reading JDBC password from properties file....");
                dataFrameReader = dataFrameReader.option("password", properties.getString("hoodie.deltastreamer.jdbc.password"));
            } else if (properties.containsKey("hoodie.deltastreamer.jdbc.password.file") && !StringUtils.isNullOrEmpty(properties.getString("hoodie.deltastreamer.jdbc.password.file"))) {
                LOG.info((Object)String.format("Reading JDBC password from password file %s", properties.getString("hoodie.deltastreamer.jdbc.password.file")));
                fileSystem2 = FileSystem.get((Configuration)session.sparkContext().hadoopConfiguration());
                passwordFileStream = fileSystem2.open(new Path(properties.getString("hoodie.deltastreamer.jdbc.password.file")));
                byte[] bytes = new byte[passwordFileStream.available()];
                passwordFileStream.read(bytes);
                dataFrameReader = dataFrameReader.option("password", new String(bytes));
            } else {
                throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS datasource", "hoodie.deltastreamer.jdbc.password.file", "hoodie.deltastreamer.jdbc.password"));
            }
            JdbcSource.addExtraJdbcOptions(properties, dataFrameReader);
            if (properties.getBoolean("hoodie.deltastreamer.jdbc.incr.pull")) {
                DataSourceUtils.checkRequiredProperties((TypedProperties)properties, Collections.singletonList("hoodie.deltastreamer.jdbc.table.incr.column.name"));
            }
            fileSystem2 = dataFrameReader;
            return fileSystem2;
        }
        catch (Exception e) {
            throw new HoodieException("Failed to validate properties", e);
        }
        finally {
            IOUtils.closeStream(passwordFileStream);
        }
    }

    private static void addExtraJdbcOptions(TypedProperties properties, DataFrameReader dataFrameReader) {
        Set<Object> objects = properties.keySet();
        for (Object property : objects) {
            String prop = property.toString();
            if (!prop.startsWith("hoodie.deltastreamer.jdbc.extra.options.")) continue;
            String key = String.join((CharSequence)"", prop.split("hoodie.deltastreamer.jdbc.extra.options."));
            String value = properties.getString(prop);
            if (StringUtils.isNullOrEmpty(value)) continue;
            LOG.info((Object)String.format("Adding %s -> %s to jdbc options", key, value));
            dataFrameReader.option(key, value);
        }
    }

    @Override
    protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) throws HoodieException {
        try {
            DataSourceUtils.checkRequiredProperties((TypedProperties)this.props, Arrays.asList("hoodie.deltastreamer.jdbc.url", "hoodie.deltastreamer.jdbc.driver.class", "hoodie.deltastreamer.jdbc.user", "hoodie.deltastreamer.jdbc.table.name", "hoodie.deltastreamer.jdbc.incr.pull"));
            return this.fetch(lastCkptStr, sourceLimit);
        }
        catch (HoodieException e) {
            LOG.error((Object)"Exception while running JDBCSource ", (Throwable)e);
            throw e;
        }
        catch (Exception e) {
            LOG.error((Object)"Exception while running JDBCSource ", (Throwable)e);
            throw new HoodieException("Error fetching next batch from JDBC source. Last checkpoint: " + (String)lastCkptStr.orElse(null), e);
        }
    }

    private Pair<Option<Dataset<Row>>, String> fetch(Option<String> lastCkptStr, long sourceLimit) {
        Dataset<Row> dataset;
        if (lastCkptStr.isPresent() && !StringUtils.isNullOrEmpty(lastCkptStr.get())) {
            dataset = this.incrementalFetch(lastCkptStr, sourceLimit);
        } else {
            LOG.info((Object)"No checkpoint references found. Doing a full rdbms table fetch");
            dataset = this.fullFetch(sourceLimit);
        }
        dataset.persist(StorageLevel.fromString((String)this.props.getString("hoodie.deltastreamer.jdbc.storage.level", "MEMORY_AND_DISK_SER")));
        boolean isIncremental = this.props.getBoolean("hoodie.deltastreamer.jdbc.incr.pull");
        Pair<Option<Dataset<Row>>, String> pair = Pair.of(Option.of(dataset), this.checkpoint(dataset, isIncremental, lastCkptStr));
        dataset.unpersist();
        return pair;
    }

    private Dataset<Row> incrementalFetch(Option<String> lastCheckpoint, long sourceLimit) {
        try {
            URI jdbcURI;
            String ppdQuery = "(%s) rdbms_table";
            SqlQueryBuilder queryBuilder = SqlQueryBuilder.select("*").from(this.props.getString("hoodie.deltastreamer.jdbc.table.name")).where(String.format(" %s > '%s'", this.props.getString("hoodie.deltastreamer.jdbc.table.incr.column.name"), lastCheckpoint.get()));
            if (sourceLimit > 0L && DB_LIMIT_CLAUSE.contains((jdbcURI = URI.create(this.props.getString("hoodie.deltastreamer.jdbc.url").substring(URI_JDBC_PREFIX.length()))).getScheme())) {
                queryBuilder.orderBy(this.props.getString("hoodie.deltastreamer.jdbc.table.incr.column.name")).limit(sourceLimit);
            }
            String query = String.format("(%s) rdbms_table", queryBuilder.toString());
            LOG.info((Object)("PPD QUERY: " + query));
            LOG.info((Object)String.format("Referenced last checkpoint and prepared new predicate pushdown query for jdbc pull %s", query));
            return JdbcSource.validatePropsAndGetDataFrameReader(this.sparkSession, this.props).option("dbtable", query).load();
        }
        catch (Exception e) {
            LOG.error((Object)"Error while performing an incremental fetch. Not all database support the PPD query we generate to do an incremental scan", (Throwable)e);
            if (this.props.containsKey("hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch") && this.props.getBoolean("hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch")) {
                LOG.warn((Object)"Falling back to full scan.");
                return this.fullFetch(sourceLimit);
            }
            throw e;
        }
    }

    private Dataset<Row> fullFetch(long sourceLimit) {
        URI jdbcURI;
        String ppdQuery = "(%s) rdbms_table";
        SqlQueryBuilder queryBuilder = SqlQueryBuilder.select("*").from(this.props.getString("hoodie.deltastreamer.jdbc.table.name"));
        if (sourceLimit > 0L && DB_LIMIT_CLAUSE.contains((jdbcURI = URI.create(this.props.getString("hoodie.deltastreamer.jdbc.url").substring(URI_JDBC_PREFIX.length()))).getScheme())) {
            if (this.props.containsKey("hoodie.deltastreamer.jdbc.table.incr.column.name")) {
                queryBuilder.orderBy(this.props.getString("hoodie.deltastreamer.jdbc.table.incr.column.name")).limit(sourceLimit);
            } else {
                queryBuilder.limit(sourceLimit);
            }
        }
        String query = String.format("(%s) rdbms_table", queryBuilder.toString());
        return JdbcSource.validatePropsAndGetDataFrameReader(this.sparkSession, this.props).option("dbtable", query).load();
    }

    private String checkpoint(Dataset<Row> rowDataset, boolean isIncremental, Option<String> lastCkptStr) {
        try {
            if (isIncremental) {
                Column incrementalColumn = rowDataset.col(this.props.getString("hoodie.deltastreamer.jdbc.table.incr.column.name"));
                String max = ((Row)rowDataset.agg(functions.max((Column)incrementalColumn).cast(DataTypes.StringType), new Column[0]).first()).getString(0);
                LOG.info((Object)String.format("Checkpointing column %s with value: %s ", incrementalColumn, max));
                if (max != null) {
                    return max;
                }
                return lastCkptStr.isPresent() && !StringUtils.isNullOrEmpty(lastCkptStr.get()) ? lastCkptStr.get() : "";
            }
            return "";
        }
        catch (Exception e) {
            LOG.error((Object)"Failed to checkpoint");
            throw new HoodieException("Failed to checkpoint. Last checkpoint: " + (String)lastCkptStr.orElse(null), e);
        }
    }

    protected static class Config {
        private static final String URL = "hoodie.deltastreamer.jdbc.url";
        private static final String URL_PROP = "url";
        private static final String USER = "hoodie.deltastreamer.jdbc.user";
        private static final String USER_PROP = "user";
        private static final String PASSWORD = "hoodie.deltastreamer.jdbc.password";
        private static final String PASSWORD_FILE = "hoodie.deltastreamer.jdbc.password.file";
        private static final String PASSWORD_PROP = "password";
        private static final String DRIVER_CLASS = "hoodie.deltastreamer.jdbc.driver.class";
        private static final String DRIVER_PROP = "driver";
        private static final String RDBMS_TABLE_NAME = "hoodie.deltastreamer.jdbc.table.name";
        private static final String RDBMS_TABLE_PROP = "dbtable";
        private static final String INCREMENTAL_COLUMN = "hoodie.deltastreamer.jdbc.table.incr.column.name";
        private static final String IS_INCREMENTAL = "hoodie.deltastreamer.jdbc.incr.pull";
        private static final String EXTRA_OPTIONS = "hoodie.deltastreamer.jdbc.extra.options.";
        private static final String STORAGE_LEVEL = "hoodie.deltastreamer.jdbc.storage.level";
        private static final String FALLBACK_TO_FULL_FETCH = "hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch";

        protected Config() {
        }
    }
}

