/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.SqlQueryBuilder;
import org.apache.hudi.utilities.config.JdbcSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSource
extends RowSource {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcSource.class);
    private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2");
    private static final String URI_JDBC_PREFIX = "jdbc:";

    public JdbcSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(props, sparkContext, sparkSession, schemaProvider);
    }

    private static DataFrameReader validatePropsAndGetDataFrameReader(SparkSession session, TypedProperties properties2) throws HoodieException {
        FSDataInputStream passwordFileStream = null;
        try {
            DataFrameReader fileSystem2;
            DataFrameReader dataFrameReader = session.read().format("jdbc");
            dataFrameReader = dataFrameReader.option("url", ConfigUtils.getStringWithAltKeys(properties2, JdbcSourceConfig.URL));
            dataFrameReader = dataFrameReader.option("user", ConfigUtils.getStringWithAltKeys(properties2, JdbcSourceConfig.USER));
            dataFrameReader = dataFrameReader.option("driver", ConfigUtils.getStringWithAltKeys(properties2, JdbcSourceConfig.DRIVER_CLASS));
            dataFrameReader = dataFrameReader.option("dbtable", ConfigUtils.getStringWithAltKeys(properties2, JdbcSourceConfig.RDBMS_TABLE_NAME));
            if (ConfigUtils.containsConfigProperty(properties2, JdbcSourceConfig.PASSWORD)) {
                LOG.info("Reading JDBC password from properties file....");
                dataFrameReader = dataFrameReader.option("password", ConfigUtils.getStringWithAltKeys(properties2, JdbcSourceConfig.PASSWORD));
            } else if (ConfigUtils.containsConfigProperty(properties2, JdbcSourceConfig.PASSWORD_FILE) && !StringUtils.isNullOrEmpty(ConfigUtils.getStringWithAltKeys(properties2, JdbcSourceConfig.PASSWORD_FILE))) {
                LOG.info(String.format("Reading JDBC password from password file %s", ConfigUtils.getStringWithAltKeys(properties2, JdbcSourceConfig.PASSWORD_FILE)));
                fileSystem2 = FileSystem.get((Configuration)session.sparkContext().hadoopConfiguration());
                passwordFileStream = fileSystem2.open(new Path(ConfigUtils.getStringWithAltKeys(properties2, JdbcSourceConfig.PASSWORD_FILE)));
                byte[] bytes = new byte[passwordFileStream.available()];
                passwordFileStream.read(bytes);
                dataFrameReader = dataFrameReader.option("password", new String(bytes));
            } else {
                throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS datasource", JdbcSourceConfig.PASSWORD_FILE.key(), JdbcSourceConfig.PASSWORD.key()));
            }
            JdbcSource.addExtraJdbcOptions(properties2, dataFrameReader);
            if (ConfigUtils.getBooleanWithAltKeys(properties2, JdbcSourceConfig.IS_INCREMENTAL)) {
                ConfigUtils.checkRequiredConfigProperties(properties2, Collections.singletonList(JdbcSourceConfig.INCREMENTAL_COLUMN));
            }
            fileSystem2 = dataFrameReader;
            return fileSystem2;
        }
        catch (Exception e) {
            throw new HoodieException("Failed to validate properties", e);
        }
        finally {
            IOUtils.closeStream(passwordFileStream);
        }
    }

    private static void addExtraJdbcOptions(TypedProperties properties2, DataFrameReader dataFrameReader) {
        Set<Object> objects = properties2.keySet();
        for (Object property : objects) {
            String prop = property.toString();
            Option<String> keyOption = ConfigUtils.stripPrefix(prop, JdbcSourceConfig.EXTRA_OPTIONS);
            if (!keyOption.isPresent()) continue;
            String key = keyOption.get();
            String value = properties2.getString(prop);
            if (StringUtils.isNullOrEmpty(value)) continue;
            LOG.info(String.format("Adding %s -> %s to jdbc options", key, value));
            dataFrameReader.option(key, value);
        }
    }

    @Override
    protected Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) throws HoodieException {
        try {
            ConfigUtils.checkRequiredConfigProperties(this.props, Arrays.asList(JdbcSourceConfig.URL, JdbcSourceConfig.DRIVER_CLASS, JdbcSourceConfig.USER, JdbcSourceConfig.RDBMS_TABLE_NAME, JdbcSourceConfig.IS_INCREMENTAL));
            return this.fetch(lastCheckpoint, sourceLimit);
        }
        catch (HoodieException e) {
            LOG.error("Exception while running JDBCSource ", (Throwable)e);
            throw e;
        }
        catch (Exception e) {
            LOG.error("Exception while running JDBCSource ", (Throwable)e);
            throw new HoodieException("Error fetching next batch from JDBC source. Last checkpoint: " + lastCheckpoint.orElse(null), e);
        }
    }

    private Pair<Option<Dataset<Row>>, Checkpoint> fetch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        Dataset<Row> dataset;
        if (lastCheckpoint.isPresent() && !StringUtils.isNullOrEmpty(lastCheckpoint.get().getCheckpointKey())) {
            dataset = this.incrementalFetch(lastCheckpoint, sourceLimit);
        } else {
            LOG.info("No checkpoint references found. Doing a full rdbms table fetch");
            dataset = this.fullFetch(sourceLimit);
        }
        dataset.persist(StorageLevel.fromString((String)ConfigUtils.getStringWithAltKeys((Properties)this.props, JdbcSourceConfig.STORAGE_LEVEL, "MEMORY_AND_DISK_SER")));
        boolean isIncremental = ConfigUtils.getBooleanWithAltKeys(this.props, JdbcSourceConfig.IS_INCREMENTAL);
        Pair<Option<Dataset<Row>>, Checkpoint> pair = Pair.of(Option.of(dataset), this.checkpoint(dataset, isIncremental, lastCheckpoint));
        dataset.unpersist();
        return pair;
    }

    private Dataset<Row> incrementalFetch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        try {
            URI jdbcURI;
            String ppdQuery = "(%s) rdbms_table";
            SqlQueryBuilder queryBuilder = SqlQueryBuilder.select("*").from(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.RDBMS_TABLE_NAME)).where(String.format(" %s > '%s'", ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.INCREMENTAL_COLUMN), lastCheckpoint.get().getCheckpointKey()));
            if (sourceLimit > 0L && DB_LIMIT_CLAUSE.contains((jdbcURI = URI.create(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.URL).substring(URI_JDBC_PREFIX.length()))).getScheme())) {
                queryBuilder.orderBy(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.INCREMENTAL_COLUMN)).limit(sourceLimit);
            }
            String query = String.format("(%s) rdbms_table", queryBuilder.toString());
            LOG.info("PPD QUERY: " + query);
            LOG.info(String.format("Referenced last checkpoint and prepared new predicate pushdown query for jdbc pull %s", query));
            return JdbcSource.validatePropsAndGetDataFrameReader(this.sparkSession, this.props).option("dbtable", query).load();
        }
        catch (Exception e) {
            LOG.error("Error while performing an incremental fetch. Not all database support the PPD query we generate to do an incremental scan", (Throwable)e);
            if (ConfigUtils.containsConfigProperty(this.props, JdbcSourceConfig.FALLBACK_TO_FULL_FETCH) && ConfigUtils.getBooleanWithAltKeys(this.props, JdbcSourceConfig.FALLBACK_TO_FULL_FETCH)) {
                LOG.warn("Falling back to full scan.");
                return this.fullFetch(sourceLimit);
            }
            throw e;
        }
    }

    private Dataset<Row> fullFetch(long sourceLimit) {
        URI jdbcURI;
        String ppdQuery = "(%s) rdbms_table";
        SqlQueryBuilder queryBuilder = SqlQueryBuilder.select("*").from(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.RDBMS_TABLE_NAME));
        if (sourceLimit > 0L && DB_LIMIT_CLAUSE.contains((jdbcURI = URI.create(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.URL).substring(URI_JDBC_PREFIX.length()))).getScheme())) {
            if (ConfigUtils.containsConfigProperty(this.props, JdbcSourceConfig.INCREMENTAL_COLUMN)) {
                queryBuilder.orderBy(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.INCREMENTAL_COLUMN)).limit(sourceLimit);
            } else {
                queryBuilder.limit(sourceLimit);
            }
        }
        String query = String.format("(%s) rdbms_table", queryBuilder.toString());
        return JdbcSource.validatePropsAndGetDataFrameReader(this.sparkSession, this.props).option("dbtable", query).load();
    }

    private Checkpoint checkpoint(Dataset<Row> rowDataset, boolean isIncremental, Option<Checkpoint> lastCheckpoint) {
        try {
            if (isIncremental) {
                Column incrementalColumn = rowDataset.col(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.INCREMENTAL_COLUMN));
                String max = ((Row)rowDataset.agg(functions.max((Column)incrementalColumn).cast(DataTypes.StringType), new Column[0]).first()).getString(0);
                LOG.info(String.format("Checkpointing column %s with value: %s ", incrementalColumn, max));
                if (max != null) {
                    return new StreamerCheckpointV2(max);
                }
                return lastCheckpoint.isPresent() && !StringUtils.isNullOrEmpty(lastCheckpoint.get().getCheckpointKey()) ? lastCheckpoint.get() : new StreamerCheckpointV2("");
            }
            return new StreamerCheckpointV2("");
        }
        catch (Exception e) {
            LOG.error("Failed to checkpoint");
            throw new HoodieReadFromSourceException("Failed to checkpoint. Last checkpoint: " + lastCheckpoint.orElse(null), e);
        }
    }

    protected static class Config {
        private static final String URL_PROP = "url";
        private static final String USER_PROP = "user";
        private static final String PASSWORD_PROP = "password";
        private static final String DRIVER_PROP = "driver";
        private static final String RDBMS_TABLE_PROP = "dbtable";

        protected Config() {
        }
    }
}

