/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import io.hops.hudi.com.beust.jcommander.JCommander;
import io.hops.hudi.com.beust.jcommander.Parameter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Scanner;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.exception.HoodieIncrementalPullException;
import org.apache.hudi.utilities.exception.HoodieIncrementalPullSQLException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.stringtemplate.v4.ST;

public class HiveIncrementalPuller {
    private static final Logger LOG = LogManager.getLogger(HiveIncrementalPuller.class);
    private Connection connection;
    protected final Config config;
    private final ST incrementalPullSQLtemplate;

    public HiveIncrementalPuller(Config config) throws IOException {
        this.config = config;
        this.validateConfig(config);
        String templateContent = FileIOUtils.readAsUTFString(this.getClass().getResourceAsStream("/IncrementalPull.sqltemplate"));
        this.incrementalPullSQLtemplate = new ST(templateContent);
    }

    private void validateConfig(Config config) {
        if (config.maxCommits == -1) {
            config.maxCommits = Integer.MAX_VALUE;
        }
    }

    public void saveDelta() throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get((Configuration)conf);
        Statement stmt = null;
        try {
            if (this.config.fromCommitTime == null) {
                this.config.fromCommitTime = this.inferCommitTime(fs);
                LOG.info((Object)("FromCommitTime inferred as " + this.config.fromCommitTime));
            }
            LOG.info((Object)("FromCommitTime - " + this.config.fromCommitTime));
            String sourceTableLocation = this.getTableLocation(this.config.sourceDb, this.config.sourceTable);
            String lastCommitTime = this.getLastCommitTimePulled(fs, sourceTableLocation);
            if (lastCommitTime == null) {
                LOG.info((Object)"Nothing to pull. However we will continue to create a empty table");
                lastCommitTime = this.config.fromCommitTime;
            }
            Connection conn = this.getConnection();
            stmt = conn.createStatement();
            String tempDbTable = this.config.tmpDb + "." + this.config.targetTable + "__" + this.config.sourceTable;
            String tempDbTablePath = this.config.hoodieTmpDir + "/" + this.config.targetTable + "__" + this.config.sourceTable + "/" + lastCommitTime;
            this.executeStatement("drop table " + tempDbTable, stmt);
            this.deleteHDFSPath(fs, tempDbTablePath);
            if (!this.ensureTempPathExists(fs, lastCommitTime)) {
                throw new IllegalStateException("Could not create target path at " + new Path(this.config.hoodieTmpDir, this.config.targetTable + "/" + lastCommitTime));
            }
            this.initHiveBeelineProperties(stmt);
            this.executeIncrementalSQL(tempDbTable, tempDbTablePath, stmt);
            LOG.info((Object)"Finished HoodieReader execution");
        }
        catch (SQLException e) {
            LOG.error((Object)"Exception when executing SQL", (Throwable)e);
            throw new IOException("Could not scan " + this.config.sourceTable + " incrementally", e);
        }
        finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (SQLException e) {
                LOG.error((Object)"Could not close the resultset opened ", (Throwable)e);
            }
        }
    }

    private void executeIncrementalSQL(String tempDbTable, String tempDbTablePath, Statement stmt) throws FileNotFoundException, SQLException {
        this.incrementalPullSQLtemplate.add("tempDbTable", tempDbTable);
        this.incrementalPullSQLtemplate.add("tempDbTablePath", tempDbTablePath);
        String storedAsClause = this.getStoredAsClause();
        this.incrementalPullSQLtemplate.add("storedAsClause", storedAsClause);
        String incrementalSQL = new Scanner(new File(this.config.incrementalSQLFile)).useDelimiter("\\Z").next();
        if (!incrementalSQL.contains(this.config.sourceDb + "." + this.config.sourceTable)) {
            LOG.info((Object)("Incremental SQL does not have " + this.config.sourceDb + "." + this.config.sourceTable + ", which means its pulling from a different table. Fencing this from happening."));
            throw new HoodieIncrementalPullSQLException("Incremental SQL does not have " + this.config.sourceDb + "." + this.config.sourceTable);
        }
        if (!incrementalSQL.contains("`_hoodie_commit_time` > '%targetBasePath'")) {
            LOG.info((Object)("Incremental SQL : " + incrementalSQL + " does not contain `_hoodie_commit_time` > '%targetBasePath'. Please add this clause for incremental to work properly."));
            throw new HoodieIncrementalPullSQLException("Incremental SQL does not have clause `_hoodie_commit_time` > '%targetBasePath', which means its not pulling incrementally");
        }
        this.incrementalPullSQLtemplate.add("incrementalSQL", String.format(incrementalSQL, this.config.fromCommitTime));
        String sql = this.incrementalPullSQLtemplate.render();
        this.executeStatement(sql, stmt);
    }

    private String getStoredAsClause() {
        return "STORED AS AVRO";
    }

    private void initHiveBeelineProperties(Statement stmt) throws SQLException {
        LOG.info((Object)"Setting up Hive JDBC Session with properties");
        this.executeStatement("set mapred.job.queue.name=" + this.config.yarnQueueName, stmt);
        this.executeStatement("set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat", stmt);
        this.executeStatement("set hive.strict.checks.large.query=false", stmt);
        this.executeStatement("set hive.stats.autogather=false", stmt);
        this.executeStatement("set hoodie." + this.config.sourceTable + ".consume.mode=INCREMENTAL", stmt);
        this.executeStatement("set hoodie." + this.config.sourceTable + ".consume.start.timestamp=" + this.config.fromCommitTime, stmt);
        this.executeStatement("set hoodie." + this.config.sourceTable + ".consume.max.commits=" + this.config.maxCommits, stmt);
    }

    private boolean deleteHDFSPath(FileSystem fs, String path) throws IOException {
        LOG.info((Object)("Deleting path " + path));
        return fs.delete(new Path(path), true);
    }

    private void executeStatement(String sql, Statement stmt) throws SQLException {
        LOG.info((Object)("Executing: " + sql));
        stmt.execute(sql);
    }

    private String inferCommitTime(FileSystem fs) throws IOException {
        LOG.info((Object)("FromCommitTime not specified. Trying to infer it from Hoodie table " + this.config.targetDb + "." + this.config.targetTable));
        String targetDataLocation = this.getTableLocation(this.config.targetDb, this.config.targetTable);
        return this.scanForCommitTime(fs, targetDataLocation);
    }

    private String getTableLocation(String db, String table) {
        ResultSet resultSet = null;
        Statement stmt = null;
        try {
            Connection conn = this.getConnection();
            stmt = conn.createStatement();
            resultSet = stmt.executeQuery("describe formatted `" + db + "." + table + "`");
            while (resultSet.next()) {
                if (!resultSet.getString(1).trim().equals("Location:")) continue;
                LOG.info((Object)("Inferred table location for " + db + "." + table + " as " + resultSet.getString(2)));
                String string = resultSet.getString(2);
                return string;
            }
        }
        catch (SQLException e) {
            throw new HoodieIncrementalPullException("Failed to get data location for table " + db + "." + table, e);
        }
        finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
                if (resultSet != null) {
                    resultSet.close();
                }
            }
            catch (SQLException e) {
                LOG.error((Object)"Could not close the resultset opened ", (Throwable)e);
            }
        }
        return null;
    }

    private String scanForCommitTime(FileSystem fs, String targetDataPath) throws IOException {
        if (targetDataPath == null) {
            throw new IllegalArgumentException("Please specify either --fromCommitTime or --targetDataPath");
        }
        if (!fs.exists(new Path(targetDataPath)) || !fs.exists(new Path(targetDataPath + "/.hoodie"))) {
            return "0";
        }
        HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), targetDataPath);
        Option<HoodieInstant> lastCommit = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
        if (lastCommit.isPresent()) {
            return lastCommit.get().getTimestamp();
        }
        return "0";
    }

    private boolean ensureTempPathExists(FileSystem fs, String lastCommitTime) throws IOException {
        boolean result;
        Path targetPath;
        Path targetBaseDirPath = new Path(this.config.hoodieTmpDir, this.config.targetTable + "__" + this.config.sourceTable);
        if (!fs.exists(targetBaseDirPath)) {
            LOG.info((Object)("Creating " + targetBaseDirPath + " with permission drwxrwxrwx"));
            boolean result2 = FileSystem.mkdirs((FileSystem)fs, (Path)targetBaseDirPath, (FsPermission)new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
            if (!result2) {
                throw new HoodieException("Could not create " + targetBaseDirPath + " with the required permissions");
            }
        }
        if (fs.exists(targetPath = new Path(targetBaseDirPath, lastCommitTime)) && !(result = fs.delete(targetPath, true))) {
            throw new HoodieException("Could not delete existing " + targetPath);
        }
        LOG.info((Object)("Creating " + targetPath + " with permission drwxrwxrwx"));
        return FileSystem.mkdirs((FileSystem)fs, (Path)targetBaseDirPath, (FsPermission)new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
    }

    private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) {
        HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), sourceTableLocation);
        List commitsToSync = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().findInstantsAfter(this.config.fromCommitTime, this.config.maxCommits).getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        if (commitsToSync.isEmpty()) {
            LOG.warn((Object)("Nothing to sync. All commits in " + this.config.sourceTable + " are " + metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toList()) + " and from commit time is " + this.config.fromCommitTime));
            return null;
        }
        LOG.info((Object)("Syncing commits " + commitsToSync));
        return (String)commitsToSync.get(commitsToSync.size() - 1);
    }

    private Connection getConnection() throws SQLException {
        if (this.connection == null) {
            LOG.info((Object)("Getting Hive Connection to " + this.config.hiveJDBCUrl));
            this.connection = DriverManager.getConnection(this.config.hiveJDBCUrl, this.config.hiveUsername, this.config.hivePassword);
        }
        return this.connection;
    }

    public static void main(String[] args) throws IOException {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        new HiveIncrementalPuller(cfg).saveDelta();
    }

    static {
        String driverName = "io.hops.hive.jdbc.HiveDriver";
        try {
            Class.forName(driverName);
        }
        catch (ClassNotFoundException e) {
            throw new IllegalStateException("Could not find " + driverName + " in classpath. ", e);
        }
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--hiveUrl"})
        public String hiveJDBCUrl = "jdbc:hive2://localhost:10014/;transportMode=http;httpPath=hs2";
        @Parameter(names={"--hiveUser"})
        public String hiveUsername = "hive";
        @Parameter(names={"--hivePass"})
        public String hivePassword = "";
        @Parameter(names={"--queue"})
        public String yarnQueueName = "hadoop-queue";
        @Parameter(names={"--tmp"})
        public String hoodieTmpDir = "/app/hoodie/intermediate";
        @Parameter(names={"--extractSQLFile"}, required=true)
        public String incrementalSQLFile;
        @Parameter(names={"--sourceDb"}, required=true)
        public String sourceDb;
        @Parameter(names={"--sourceTable"}, required=true)
        public String sourceTable;
        @Parameter(names={"--targetDb"})
        public String targetDb;
        @Parameter(names={"--targetTable"}, required=true)
        public String targetTable;
        @Parameter(names={"--tmpdb"})
        public String tmpDb = "tmp";
        @Parameter(names={"--fromCommitTime"})
        public String fromCommitTime;
        @Parameter(names={"--maxCommits"})
        public int maxCommits = 3;
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;
    }
}

