/*
 * Decompiled with CFR 0.152.
 */
package io.hops.util;

import com.google.common.io.ByteStreams;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import io.hops.util.AvroDeserializer;
import io.hops.util.HopsConsumer;
import io.hops.util.HopsProducer;
import io.hops.util.KafkaProperties;
import io.hops.util.SparkInfo;
import io.hops.util.WorkflowManager;
import io.hops.util.exceptions.CredentialsNotFoundException;
import io.hops.util.exceptions.HTTPSClientInitializationException;
import io.hops.util.exceptions.SchemaNotFoundException;
import io.hops.util.exceptions.TopicNotFoundException;
import io.hops.util.flink.FlinkConsumer;
import io.hops.util.flink.FlinkProducer;
import io.hops.util.spark.SparkConsumer;
import io.hops.util.spark.SparkProducer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLSession;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.net.util.Base64;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.json.JSONObject;

public class Hops {
    private static final Logger LOG = Logger.getLogger(Hops.class.getName());
    private static Integer projectId;
    private static String projectName;
    private static String jobName;
    private static String appId;
    private static String jobType;
    private static List<String> brokerEndpointsList;
    private static String brokerEndpoints;
    private static String restEndpoint;
    private static String keyStore;
    private static String trustStore;
    private static String keystorePwd;
    private static String truststorePwd;
    private static List<String> topics;
    private static List<String> consumerGroups;
    private static String elasticEndPoint;
    private static SparkInfo sparkInfo;
    private static WorkflowManager workflowManager;

    private Hops() {
    }

    private static synchronized void setup() {
        Properties sysProps = System.getProperties();
        if (sysProps.containsKey("hopsworks.job.type") && sysProps.getProperty("hopsworks.job.type").equalsIgnoreCase("spark")) {
            try {
                String pwd;
                restEndpoint = sysProps.getProperty("hopsworks.restendpoint");
                projectName = sysProps.getProperty("hopsworks.projectname");
                keyStore = "k_certificate";
                trustStore = "t_certificate";
                projectId = Integer.parseInt(sysProps.getProperty("hopsworks.projectid"));
                keystorePwd = pwd = Hops.getCertPw();
                truststorePwd = pwd;
                jobName = sysProps.getProperty("hopsworks.job.name");
                appId = sysProps.getProperty("hopsworks.job.appid");
                jobType = sysProps.getProperty("hopsworks.job.type");
                elasticEndPoint = sysProps.getProperty("hopsworks.elastic.endpoint");
                if (sysProps.containsKey("hopsworks.kafka.brokeraddress")) {
                    Hops.parseBrokerEndpoints(sysProps.getProperty("hopsworks.kafka.brokeraddress"));
                }
                if (sysProps.containsKey("hopsworks.kafka.job.topics")) {
                    topics = Arrays.asList(sysProps.getProperty("hopsworks.kafka.job.topics").split(File.pathSeparator));
                }
                if (sysProps.containsKey("hopsworks.kafka.consumergroups")) {
                    consumerGroups = Arrays.asList(sysProps.getProperty("hopsworks.kafka.consumergroups").split(File.pathSeparator));
                }
                sparkInfo = new SparkInfo(jobName);
            }
            catch (CredentialsNotFoundException ex) {
                LOG.log(Level.SEVERE, "Could not get credentials for certificates", ex);
            }
        }
    }

    public synchronized Hops setup(String endpoint, String domain) {
        Properties sysProps = System.getProperties();
        projectId = Integer.parseInt(sysProps.getProperty("hopsworks.projectid"));
        projectName = sysProps.getProperty("hopsworks.projectname");
        Hops.parseBrokerEndpoints(sysProps.getProperty("hopsworks.kafka.brokeraddress"));
        restEndpoint = endpoint + File.separator + "hopsworks-api/api";
        keyStore = "k_certificate";
        trustStore = "t_certificate";
        return this;
    }

    public synchronized Hops setup(String topic, String restEndpoint, String keyStore, String trustStore, String domain) {
        Properties sysProps = System.getProperties();
        projectId = Integer.parseInt(sysProps.getProperty("hopsworks.projectid"));
        projectName = sysProps.getProperty("hopsworks.projectname");
        Hops.parseBrokerEndpoints(sysProps.getProperty("hopsworks.kafka.brokeraddress"));
        Hops.restEndpoint = restEndpoint + File.separator + "hopsworks-api/api";
        Hops.keyStore = keyStore;
        Hops.trustStore = trustStore;
        return this;
    }

    public static synchronized void setup(int pId, String topicN, String brokerE, String restE, String keySt, String trustSt, String keystPwd, String truststPwd) {
        Hops.parseBrokerEndpoints(brokerE);
        restEndpoint = restE;
        keyStore = keySt;
        trustStore = trustSt;
        keystorePwd = keystPwd;
        truststorePwd = truststPwd;
        projectId = pId;
        topics = new LinkedList<String>();
        topics.add(topicN);
    }

    public synchronized Hops setup(int projectId, String topics, String consumerGroups, String brokerEndpoint, String restEndpoint, String keyStore, String trustStore, String keystorePwd, String truststorePwd) {
        Hops.projectId = projectId;
        Hops.parseBrokerEndpoints(brokerEndpoint);
        Hops.restEndpoint = restEndpoint;
        Hops.topics = Arrays.asList(topics.split(File.pathSeparator));
        Hops.consumerGroups = Arrays.asList(consumerGroups.split(File.pathSeparator));
        Hops.keyStore = keyStore;
        Hops.trustStore = trustStore;
        Hops.keystorePwd = keystorePwd;
        Hops.truststorePwd = truststorePwd;
        return this;
    }

    public static synchronized void setup(Map<String, String> params) {
        projectId = Integer.parseInt(params.get("hopsworks.projectid"));
        Hops.parseBrokerEndpoints(params.get("hopsworks.kafka.brokeraddress"));
        restEndpoint = params.get("hopsworks.restendpoint");
        topics = Arrays.asList(params.get("hopsworks.kafka.job.topics").split(File.pathSeparator));
        if (params.containsKey("hopsworks.kafka.consumergroups")) {
            consumerGroups = Arrays.asList(params.get("hopsworks.kafka.consumergroups").split(File.pathSeparator));
        }
        keyStore = params.get("k_certificate");
        trustStore = params.get("t_certificate");
    }

    public static KafkaProperties getKafkaProperties() {
        return new KafkaProperties();
    }

    public static HopsConsumer getHopsConsumer(String topic) throws SchemaNotFoundException, CredentialsNotFoundException {
        return new HopsConsumer(topic);
    }

    public static HopsProducer getHopsProducer(String topic) throws SchemaNotFoundException, CredentialsNotFoundException {
        return new HopsProducer(topic, null);
    }

    public static FlinkConsumer getFlinkConsumer(String topic) {
        return Hops.getFlinkConsumer(topic, new AvroDeserializer(topic));
    }

    public static FlinkConsumer getFlinkConsumer(String topic, DeserializationSchema deserializationSchema) {
        return new FlinkConsumer(topic, deserializationSchema, Hops.getKafkaProperties().getConsumerConfig());
    }

    public static FlinkProducer getFlinkProducer(String topic) {
        return Hops.getFlinkProducer(topic, new AvroDeserializer(topic));
    }

    public static FlinkProducer getFlinkProducer(String topic, SerializationSchema serializationSchema) {
        return new FlinkProducer(topic, serializationSchema, Hops.getKafkaProperties().defaultProps());
    }

    public static SparkProducer getSparkProducer() throws SchemaNotFoundException, TopicNotFoundException, CredentialsNotFoundException {
        if (Hops.getTopics() != null && Hops.getTopics().size() == 1) {
            return new SparkProducer(Hops.getTopics().get(0), null);
        }
        throw new TopicNotFoundException("No topic was found for this spark producer");
    }

    public static SparkProducer getSparkProducer(String topic) throws SchemaNotFoundException, CredentialsNotFoundException {
        return new SparkProducer(topic, null);
    }

    public static SparkProducer getSparkProducer(String topic, Properties userProps) throws SchemaNotFoundException, CredentialsNotFoundException {
        return new SparkProducer(topic, userProps);
    }

    public static SparkConsumer getSparkConsumer() {
        return new SparkConsumer();
    }

    public static SparkConsumer getSparkConsumer(Properties userProps) {
        return new SparkConsumer(userProps);
    }

    public static SparkConsumer getSparkConsumer(Collection<String> topics) throws TopicNotFoundException {
        if (topics != null && !topics.isEmpty()) {
            return new SparkConsumer(topics);
        }
        throw new TopicNotFoundException("No topic was found for this spark consumer");
    }

    public static SparkConsumer getSparkConsumer(JavaStreamingContext jsc) throws TopicNotFoundException {
        if (topics != null && !topics.isEmpty()) {
            return new SparkConsumer(jsc, topics);
        }
        throw new TopicNotFoundException("No topic was found for this spark consumer");
    }

    public static SparkConsumer getSparkConsumer(JavaStreamingContext jsc, Properties userProps) throws TopicNotFoundException {
        if (topics != null && !topics.isEmpty()) {
            return new SparkConsumer(jsc, (Collection<String>)topics, userProps);
        }
        throw new TopicNotFoundException("No topic was found for this spark consumer");
    }

    public static SparkConsumer getSparkConsumer(JavaStreamingContext jsc, Collection<String> topics) {
        return new SparkConsumer(jsc, topics);
    }

    public static SparkConsumer getSparkConsumer(JavaStreamingContext jsc, Collection<String> topics, Properties userProps) {
        return new SparkConsumer(jsc, topics, userProps);
    }

    public AvroDeserializer getHopsAvroSchema(String topic) {
        return new AvroDeserializer(topic);
    }

    public static String getSchema(String topic) throws SchemaNotFoundException, CredentialsNotFoundException {
        return Hops.getSchema(topic, Integer.MIN_VALUE);
    }

    public static Map<String, Schema> getSchemas() throws CredentialsNotFoundException, SchemaNotFoundException {
        HashMap<String, Schema> schemas = new HashMap<String, Schema>();
        for (String topic : Hops.getTopics()) {
            Schema.Parser parser = new Schema.Parser();
            schemas.put(topic, parser.parse(Hops.getSchema(topic)));
        }
        return schemas;
    }

    public static Map<String, Injection<GenericRecord, byte[]>> getRecordInjections() {
        Map<String, Schema> schemas = null;
        try {
            schemas = Hops.getSchemas();
        }
        catch (CredentialsNotFoundException | SchemaNotFoundException e) {
            LOG.log(Level.SEVERE, e.getMessage());
        }
        if (schemas == null || schemas.isEmpty()) {
            return null;
        }
        HashMap<String, Injection<GenericRecord, byte[]>> recordInjections = new HashMap<String, Injection<GenericRecord, byte[]>>();
        for (String topic : Hops.getTopics()) {
            recordInjections.put(topic, (Injection<GenericRecord, byte[]>)GenericAvroCodecs.toBinary((Schema)schemas.get(topic)));
        }
        return recordInjections;
    }

    public static String getSchema(String topic, int versionId) throws CredentialsNotFoundException, SchemaNotFoundException {
        LOG.log(Level.FINE, "Getting schema for topic:{0} from uri:{1}", new String[]{topic});
        JSONObject json = new JSONObject();
        json.append("topicName", (Object)topic);
        if (versionId > 0) {
            json.append("version", (Object)versionId);
        }
        Response response = null;
        try {
            response = Hops.clientWrapper(json, "schema");
        }
        catch (HTTPSClientInitializationException e) {
            throw new SchemaNotFoundException(e.getMessage());
        }
        LOG.log(Level.INFO, "******* response.getStatusInfo():" + response.getStatusInfo());
        if (response.getStatusInfo().getStatusCode() != Response.Status.OK.getStatusCode()) {
            throw new SchemaNotFoundException("No schema found for topic:" + topic);
        }
        String responseEntity = (String)response.readEntity(String.class);
        json = new JSONObject(responseEntity);
        return json.getString("contents");
    }

    protected static Response clientWrapper(JSONObject json, String resource) throws CredentialsNotFoundException, HTTPSClientInitializationException {
        Client client;
        json.append("keyStorePwd", (Object)keystorePwd);
        try {
            json.append("keyStore", (Object)Hops.keystoreEncode());
        }
        catch (IOException ex) {
            LOG.log(Level.SEVERE, null, ex);
            throw new CredentialsNotFoundException("Could not initialize Hops properties.");
        }
        try {
            client = Hops.initClient();
        }
        catch (IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
            throw new HTTPSClientInitializationException(e.getMessage());
        }
        WebTarget webTarget = client.target(Hops.getRestEndpoint() + "/").path("hopsworks-api/api/appservice/" + resource);
        LOG.info("webTarget.getUri().getHost():" + webTarget.getUri().getHost());
        LOG.info("webTarget.getUri().getPort():" + webTarget.getUri().getPort());
        LOG.info("webTarget.getUri().getPath():" + webTarget.getUri().getPath());
        Invocation.Builder invocationBuilder = webTarget.request().accept(new String[]{"application/json"});
        return invocationBuilder.post(Entity.entity((Object)json.toString(), (String)"application/json"));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static String getCertPw() throws CredentialsNotFoundException {
        try (FileInputStream fis = new FileInputStream("material_passwd");){
            int content;
            StringBuilder sb = new StringBuilder();
            while ((content = fis.read()) != -1) {
                sb.append((char)content);
            }
            String string = sb.toString();
            return string;
        }
        catch (IOException ex) {
            LOG.log(Level.SEVERE, null, ex);
            return null;
        }
    }

    static String keystoreEncode() throws IOException {
        FileInputStream kfin = new FileInputStream(new File(keyStore));
        byte[] kStoreBlob = ByteStreams.toByteArray((InputStream)kfin);
        return Base64.encodeBase64String((byte[])kStoreBlob);
    }

    public static List<String> getBrokerEndpointsList() {
        return brokerEndpointsList;
    }

    public static String getBrokerEndpoints() {
        return brokerEndpoints;
    }

    public static Integer getProjectId() {
        return projectId;
    }

    public static String getRestEndpoint() {
        return restEndpoint;
    }

    public static String getKeyStore() {
        return keyStore;
    }

    public static String getTrustStore() {
        return trustStore;
    }

    public static String getKeystorePwd() {
        return keystorePwd;
    }

    public static String getTruststorePwd() {
        return truststorePwd;
    }

    public static List<String> getTopics() {
        return topics;
    }

    public static String getTopicsAsCSV() {
        StringBuilder sb = new StringBuilder();
        topics.forEach(topic -> sb.append((String)topic).append(","));
        if (sb.charAt(sb.length() - 1) == ',') {
            return sb.substring(0, sb.length() - 1);
        }
        return sb.toString();
    }

    public static List<String> getConsumerGroups() {
        return consumerGroups;
    }

    public static String getProjectName() {
        return projectName;
    }

    public static String getElasticEndPoint() {
        return elasticEndPoint;
    }

    public static String getJobName() {
        return jobName;
    }

    public static String getAppId() {
        return appId;
    }

    public static String getJobType() {
        return jobType;
    }

    public static WorkflowManager getWorkflowManager() {
        return workflowManager;
    }

    public static void shutdownGracefully(JavaStreamingContext jssc) throws InterruptedException {
        Hops.shutdownGracefully(jssc, 3000);
    }

    public static void shutdownGracefully(StreamingQuery query) throws InterruptedException, StreamingQueryException {
        Hops.shutdownGracefully(query, 3000L);
    }

    public static void shutdownGracefully(JavaStreamingContext jssc, int intervalMillis) throws InterruptedException {
        boolean isStopped = false;
        while (!isStopped) {
            isStopped = jssc.awaitTerminationOrTimeout((long)intervalMillis);
            if (isStopped || !sparkInfo.isShutdownRequested()) continue;
            LOG.info("Marker file has been removed, will attempt to stop gracefully the spark streaming context");
            jssc.stop(true, true);
        }
    }

    public static void shutdownGracefully(StreamingQuery query, long checkIntervalMillis) throws StreamingQueryException {
        boolean isStopped = false;
        while (!isStopped) {
            isStopped = query.awaitTermination(checkIntervalMillis);
            if (isStopped || !sparkInfo.isShutdownRequested()) continue;
            LOG.info("Marker file has been removed, will attempt to stop gracefully the spark structured streaming query");
            query.stop();
        }
    }

    public static void shutdownGracefully(JavaSparkContext jsc) throws InterruptedException {
        while (!sparkInfo.isShutdownRequested()) {
            Thread.sleep(5000L);
            LOG.info("Sleeping marker");
        }
        LOG.info("Marker file has been removed, will attempt to gracefully stop the spark context");
        jsc.stop();
        jsc.close();
    }

    public static Map<String, String> getFlinkKafkaProps(String propsStr) {
        String[] propsArray;
        propsStr = propsStr.replace("-D", "").replace("\"", "").replace("'", "");
        HashMap<String, String> props = new HashMap<String, String>();
        for (String kafkaProperty : propsArray = propsStr.split(",")) {
            String[] keyVal = kafkaProperty.split("=");
            props.put(keyVal[0], keyVal[1]);
        }
        return props;
    }

    private static void parseBrokerEndpoints(String addresses) {
        brokerEndpoints = addresses;
        brokerEndpointsList = Arrays.asList(addresses.split(","));
    }

    private static Client initClient() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException {
        KeyStore truststore = KeyStore.getInstance(KeyStore.getDefaultType());
        try (FileInputStream trustStoreIS = new FileInputStream("domain_ca_truststore");){
            truststore.load(trustStoreIS, null);
        }
        return ClientBuilder.newBuilder().trustStore(truststore).hostnameVerifier((HostnameVerifier)InsecureHostnameVerifier.INSTANCE).build();
    }

    static {
        Hops.setup();
    }

    private static class InsecureHostnameVerifier
    implements HostnameVerifier {
        static InsecureHostnameVerifier INSTANCE = new InsecureHostnameVerifier();

        InsecureHostnameVerifier() {
        }

        @Override
        public boolean verify(String string, SSLSession ssls) {
            return string.equals(restEndpoint.split(":")[0]);
        }
    }
}

