/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import com.klarna.hiverunner.HiveServerContext;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
import org.apache.flink.shaded.guava30.com.google.common.base.Throwables;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.junit.rules.ExternalResource;
import org.junit.rules.TestRule;
import org.junit.runners.model.InitializationError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkStandaloneHiveRunner
extends FlinkEmbeddedHiveRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkStandaloneHiveRunner.class);
    private static final Duration HMS_START_TIMEOUT = Duration.ofSeconds(90L);
    private Future<Void> hmsWatcher;
    private int hmsPort;

    public FlinkStandaloneHiveRunner(Class<?> clazz) throws InitializationError {
        super(clazz);
    }

    @Override
    protected List<TestRule> classRules() {
        try {
            this.hmsPort = HiveTestUtils.getFreePort();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        List<TestRule> rules = super.classRules();
        ExternalResource hms = new ExternalResource(){

            protected void before() throws Throwable {
                FlinkStandaloneHiveRunner.this.hmsWatcher = FlinkStandaloneHiveRunner.startHMS(FlinkStandaloneHiveRunner.this.context, FlinkStandaloneHiveRunner.this.hmsPort);
            }

            protected void after() {
                if (FlinkStandaloneHiveRunner.this.hmsWatcher != null) {
                    FlinkStandaloneHiveRunner.this.hmsWatcher.cancel(true);
                }
            }
        };
        rules.add(rules.size() - 1, (TestRule)hms);
        return rules;
    }

    private static Future<Void> startHMS(HiveServerContext context, int port) throws Exception {
        context.init();
        context.getHiveConf().setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port);
        HiveConf outsideConf = context.getHiveConf();
        ArrayList<String> args = new ArrayList<String>();
        String javaHome = System.getProperty("java.home");
        args.add(Joiner.on((String)File.separator).join((Object)javaHome, (Object)"bin", new Object[]{"java"}));
        args.add("-cp");
        args.add(System.getProperty("java.class.path"));
        args.add(FlinkStandaloneHiveRunner.hiveCmdLineConfig(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, outsideConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)));
        args.add(FlinkStandaloneHiveRunner.hiveCmdLineConfig(HiveConf.ConfVars.SCRATCHDIR.varname, outsideConf.getVar(HiveConf.ConfVars.SCRATCHDIR)));
        args.add(FlinkStandaloneHiveRunner.hiveCmdLineConfig(HiveConf.ConfVars.LOCALSCRATCHDIR.varname, outsideConf.getVar(HiveConf.ConfVars.LOCALSCRATCHDIR)));
        args.add(FlinkStandaloneHiveRunner.hiveCmdLineConfig(HiveConf.ConfVars.HIVEHISTORYFILELOC.varname, outsideConf.getVar(HiveConf.ConfVars.HIVEHISTORYFILELOC)));
        args.add(FlinkStandaloneHiveRunner.hiveCmdLineConfig("hive.warehouse.subdir.inherit.perms", String.valueOf(outsideConf.getBoolean("hive.warehouse.subdir.inherit.perms", true))));
        args.add(FlinkStandaloneHiveRunner.hiveCmdLineConfig("hadoop.tmp.dir", outsideConf.get("hadoop.tmp.dir")));
        args.add(FlinkStandaloneHiveRunner.hiveCmdLineConfig("test.log.dir", outsideConf.get("test.log.dir")));
        args.add(FlinkStandaloneHiveRunner.hiveCmdLineConfig(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, outsideConf.getVar(HiveConf.ConfVars.METASTORECONNECTURLKEY)));
        File derbyLog = File.createTempFile("derby", ".log");
        derbyLog.deleteOnExit();
        args.add(FlinkStandaloneHiveRunner.hiveCmdLineConfig("derby.stream.error.file", derbyLog.getAbsolutePath()));
        if (outsideConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
            args.add(FlinkStandaloneHiveRunner.hiveCmdLineConfig(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"));
        }
        args.add(HiveMetaStore.class.getCanonicalName());
        args.add("-p");
        args.add(String.valueOf(port));
        ProcessBuilder builder = new ProcessBuilder(args);
        Process process = builder.start();
        try {
            Thread inLogger = new Thread(new LogRedirect(process.getInputStream(), LOGGER));
            Thread errLogger = new Thread(new LogRedirect(process.getErrorStream(), LOGGER));
            inLogger.setDaemon(true);
            inLogger.setName("HMS-IN-Logger");
            errLogger.setDaemon(true);
            errLogger.setName("HMS-ERR-Logger");
            inLogger.start();
            errLogger.start();
            FutureTask<Object> res = new FutureTask<Object>(() -> {
                try {
                    int r = process.waitFor();
                    inLogger.join();
                    errLogger.join();
                    if (r != 0) {
                        throw new RuntimeException("HMS process exited with " + r);
                    }
                }
                catch (InterruptedException e) {
                    LOGGER.info("Shutting down HMS");
                }
                finally {
                    if (process.isAlive()) {
                        process.destroy();
                        try {
                            process.waitFor(5L, TimeUnit.SECONDS);
                        }
                        catch (InterruptedException e) {
                            LOGGER.info("Interrupted waiting for HMS to shut down, killing it forcibly");
                        }
                        process.destroyForcibly();
                    }
                }
            }, null);
            Thread thread = new Thread(res);
            thread.setName("HMS-Watcher");
            thread.setDaemon(false);
            thread.start();
            FlinkStandaloneHiveRunner.waitForHMSStart(port);
            return res;
        }
        catch (Throwable e) {
            process.destroyForcibly();
            throw e;
        }
    }

    private static void waitForHMSStart(int port) throws Exception {
        long deadline = System.currentTimeMillis() + HMS_START_TIMEOUT.toMillis();
        while (System.currentTimeMillis() < deadline) {
            try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", port));){
                LOGGER.info("HMS started at port {}", (Object)port);
                return;
            }
            catch (ConnectException e) {
                LOGGER.info("Waiting for HMS to start...");
                Thread.sleep(1000L);
            }
        }
        throw new TimeoutException("Timeout waiting for HMS to start");
    }

    private static String hiveCmdLineConfig(String name, String val) {
        return String.format("-D%s=%s", name, val);
    }

    private static class LogRedirect
    implements Runnable {
        private final InputStream inputStream;
        private final Logger logger;

        LogRedirect(InputStream inputStream, Logger logger) {
            this.inputStream = inputStream;
            this.logger = logger;
        }

        @Override
        public void run() {
            try {
                new BufferedReader(new InputStreamReader(this.inputStream)).lines().forEach(arg_0 -> ((Logger)this.logger).info(arg_0));
            }
            catch (Exception e) {
                this.logger.error(Throwables.getStackTraceAsString((Throwable)e));
            }
        }
    }
}

