package org.apache.hudi.utilities;

import io.hops.hudi.com.beust.jcommander.JCommander;
import io.hops.hudi.com.beust.jcommander.Parameter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Scanner;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.exception.HoodieIncrementalPullException;
import org.apache.hudi.utilities.exception.HoodieIncrementalPullSQLException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.stringtemplate.v4.ST;

/* loaded from: input_file:org/apache/hudi/utilities/HiveIncrementalPuller.class */
public class HiveIncrementalPuller {
    private static final Logger LOG = LogManager.getLogger(HiveIncrementalPuller.class);
    private Connection connection;
    protected final Config config;
    private final ST incrementalPullSQLTemplate;

    /* loaded from: input_file:org/apache/hudi/utilities/HiveIncrementalPuller$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--extractSQLFile"}, required = true)
        public String incrementalSQLFile;

        @Parameter(names = {"--sourceDb"}, required = true)
        public String sourceDb;

        @Parameter(names = {"--sourceTable"}, required = true)
        public String sourceTable;

        @Parameter(names = {"--targetDb"})
        public String targetDb;

        @Parameter(names = {"--targetTable"}, required = true)
        public String targetTable;

        @Parameter(names = {"--fromCommitTime"})
        public String fromCommitTime;

        @Parameter(names = {"--hiveUrl"})
        public String hiveJDBCUrl = "jdbc:hive2://localhost:10014/;transportMode=http;httpPath=hs2";

        @Parameter(names = {"--hiveUser"})
        public String hiveUsername = "hive";

        @Parameter(names = {"--hivePass"})
        public String hivePassword = "";

        @Parameter(names = {"--queue"})
        public String yarnQueueName = "hadoop-queue";

        @Parameter(names = {"--tmp"})
        public String hoodieTmpDir = "/app/hoodie/intermediate";

        @Parameter(names = {"--tmpdb"})
        public String tmpDb = "tmp";

        @Parameter(names = {"--maxCommits"})
        public int maxCommits = 3;

        @Parameter(names = {"--fsDefaultFs"})
        public String fsDefaultFs = "file:///";

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;
    }

    public HiveIncrementalPuller(Config config) throws IOException {
        this.config = config;
        validateConfig(config);
        this.incrementalPullSQLTemplate = new ST(FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/IncrementalPull.sqltemplate")));
    }

    private void validateConfig(Config config) {
        if (config.maxCommits == -1) {
            config.maxCommits = Integer.MAX_VALUE;
        }
    }

    public void saveDelta() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", this.config.fsDefaultFs);
        FileSystem fileSystem = FileSystem.get(configuration);
        Statement statement = null;
        try {
            try {
                if (this.config.fromCommitTime == null) {
                    this.config.fromCommitTime = inferCommitTime(fileSystem);
                    LOG.info("FromCommitTime inferred as " + this.config.fromCommitTime);
                }
                LOG.info("FromCommitTime - " + this.config.fromCommitTime);
                String lastCommitTimePulled = getLastCommitTimePulled(fileSystem, getTableLocation(this.config.sourceDb, this.config.sourceTable));
                if (lastCommitTimePulled == null) {
                    LOG.info("Nothing to pull. However we will continue to create a empty table");
                    lastCommitTimePulled = this.config.fromCommitTime;
                }
                Statement createStatement = getConnection().createStatement();
                String str = this.config.tmpDb + "." + this.config.targetTable + "__" + this.config.sourceTable;
                String str2 = this.config.hoodieTmpDir + "/" + this.config.targetTable + "__" + this.config.sourceTable + "/" + lastCommitTimePulled;
                executeStatement("drop table if exists " + str, createStatement);
                deleteHDFSPath(fileSystem, str2);
                if (!ensureTempPathExists(fileSystem, lastCommitTimePulled)) {
                    throw new IllegalStateException("Could not create target path at " + new Path(this.config.hoodieTmpDir, this.config.targetTable + "/" + lastCommitTimePulled));
                }
                initHiveBeelineProperties(createStatement);
                executeIncrementalSQL(str, str2, createStatement);
                LOG.info("Finished HoodieReader execution");
                if (createStatement != null) {
                    try {
                        createStatement.close();
                    } catch (SQLException e) {
                        LOG.error("Could not close the resultSet opened ", e);
                    }
                }
            } catch (SQLException e2) {
                LOG.error("Exception when executing SQL", e2);
                throw new IOException("Could not scan " + this.config.sourceTable + " incrementally", e2);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    statement.close();
                } catch (SQLException e3) {
                    LOG.error("Could not close the resultSet opened ", e3);
                    throw th;
                }
            }
            throw th;
        }
    }

    private void executeIncrementalSQL(String str, String str2, Statement statement) throws FileNotFoundException, SQLException {
        this.incrementalPullSQLTemplate.add("tempDbTable", str);
        this.incrementalPullSQLTemplate.add("tempDbTablePath", str2);
        this.incrementalPullSQLTemplate.add("storedAsClause", getStoredAsClause());
        String next = new Scanner(new File(this.config.incrementalSQLFile)).useDelimiter("\\Z").next();
        if (!next.contains(this.config.sourceDb + "." + this.config.sourceTable)) {
            LOG.error("Incremental SQL does not have " + this.config.sourceDb + "." + this.config.sourceTable + ", which means its pulling from a different table. Fencing this from happening.");
            throw new HoodieIncrementalPullSQLException("Incremental SQL does not have " + this.config.sourceDb + "." + this.config.sourceTable);
        }
        if (!next.contains("`_hoodie_commit_time` > '%s'")) {
            LOG.error("Incremental SQL : " + next + " does not contain `_hoodie_commit_time` > '%s'. Please add this clause for incremental to work properly.");
            throw new HoodieIncrementalPullSQLException("Incremental SQL does not have clause `_hoodie_commit_time` > '%s', which means its not pulling incrementally");
        }
        this.incrementalPullSQLTemplate.add("incrementalSQL", String.format(next, this.config.fromCommitTime));
        executeStatement(this.incrementalPullSQLTemplate.render(), statement);
    }

    private String getStoredAsClause() {
        return "STORED AS AVRO";
    }

    private void initHiveBeelineProperties(Statement statement) throws SQLException {
        LOG.info("Setting up Hive JDBC Session with properties");
        executeStatement("set mapred.job.queue.name=" + this.config.yarnQueueName, statement);
        executeStatement("set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat", statement);
        executeStatement("set hive.strict.checks.large.query=false", statement);
        executeStatement("set hive.stats.autogather=false", statement);
        executeStatement("set hoodie." + this.config.sourceTable + ".consume.mode=INCREMENTAL", statement);
        executeStatement("set hoodie." + this.config.sourceTable + ".consume.start.timestamp=" + this.config.fromCommitTime, statement);
        executeStatement("set hoodie." + this.config.sourceTable + ".consume.max.commits=" + this.config.maxCommits, statement);
    }

    private boolean deleteHDFSPath(FileSystem fileSystem, String str) throws IOException {
        LOG.info("Deleting path " + str);
        return fileSystem.delete(new Path(str), true);
    }

    private void executeStatement(String str, Statement statement) throws SQLException {
        LOG.info("Executing: " + str);
        statement.execute(str);
    }

    private String inferCommitTime(FileSystem fileSystem) throws IOException {
        LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie table " + this.config.targetDb + "." + this.config.targetTable);
        return scanForCommitTime(fileSystem, getTableLocation(this.config.targetDb, this.config.targetTable));
    }

    private String getTableLocation(String str, String str2) {
        ResultSet resultSet = null;
        Statement statement = null;
        try {
            try {
                statement = getConnection().createStatement();
                resultSet = statement.executeQuery("describe formatted `" + str + "." + str2 + "`");
                while (resultSet.next()) {
                    if (resultSet.getString(1).trim().equals("Location:")) {
                        LOG.info("Inferred table location for " + str + "." + str2 + " as " + resultSet.getString(2));
                        String string = resultSet.getString(2);
                        if (statement != null) {
                            try {
                                statement.close();
                            } catch (SQLException e) {
                                LOG.error("Could not close the resultSet opened ", e);
                            }
                        }
                        if (resultSet != null) {
                            resultSet.close();
                        }
                        return string;
                    }
                }
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (SQLException e2) {
                        LOG.error("Could not close the resultSet opened ", e2);
                        return null;
                    }
                }
                if (resultSet != null) {
                    resultSet.close();
                }
                return null;
            } catch (Throwable th) {
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (SQLException e3) {
                        LOG.error("Could not close the resultSet opened ", e3);
                        throw th;
                    }
                }
                if (resultSet != null) {
                    resultSet.close();
                }
                throw th;
            }
        } catch (SQLException e4) {
            throw new HoodieIncrementalPullException("Failed to get data location for table " + str + "." + str2, e4);
        }
    }

    private String scanForCommitTime(FileSystem fileSystem, String str) throws IOException {
        if (str == null) {
            throw new IllegalArgumentException("Please specify either --fromCommitTime or --targetDataPath");
        }
        if (!fileSystem.exists(new Path(str)) || !fileSystem.exists(new Path(str + "/.hoodie"))) {
            return HoodieTimeline.INVALID_INSTANT_TS;
        }
        Option<HoodieInstant> lastInstant = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).build().getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
        return lastInstant.isPresent() ? lastInstant.get().getTimestamp() : HoodieTimeline.INVALID_INSTANT_TS;
    }

    private boolean ensureTempPathExists(FileSystem fileSystem, String str) throws IOException {
        Path path = new Path(this.config.hoodieTmpDir, this.config.targetTable + "__" + this.config.sourceTable);
        if (!fileSystem.exists(path)) {
            LOG.info("Creating " + path + " with permission drwxrwxrwx");
            if (!FileSystem.mkdirs(fileSystem, path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))) {
                throw new HoodieException("Could not create " + path + " with the required permissions");
            }
        }
        Path path2 = new Path(path, str);
        if (fileSystem.exists(path2) && !fileSystem.delete(path2, true)) {
            throw new HoodieException("Could not delete existing " + path2);
        }
        LOG.info("Creating " + path2 + " with permission drwxrwxrwx");
        return FileSystem.mkdirs(fileSystem, path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
    }

    private String getLastCommitTimePulled(FileSystem fileSystem, String str) {
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).build();
        List list = (List) build.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().findInstantsAfter(this.config.fromCommitTime, this.config.maxCommits).getInstants().map((v0) -> {
            return v0.getTimestamp();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            LOG.warn("Nothing to sync. All commits in " + this.config.sourceTable + " are " + build.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toList()) + " and from commit time is " + this.config.fromCommitTime);
            return null;
        }
        LOG.info("Syncing commits " + list);
        return (String) list.get(list.size() - 1);
    }

    private Connection getConnection() throws SQLException {
        if (this.connection == null) {
            LOG.info("Getting Hive Connection to " + this.config.hiveJDBCUrl);
            this.connection = DriverManager.getConnection(this.config.hiveJDBCUrl, this.config.hiveUsername, this.config.hivePassword);
        }
        return this.connection;
    }

    public static void main(String[] strArr) throws IOException {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        new HiveIncrementalPuller(config).saveDelta();
    }

    static {
        try {
            Class.forName("org.apache.hive.jdbc.HiveDriver");
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException("Could not find org.apache.hive.jdbc.HiveDriver in classpath. ", e);
        }
    }
}
