/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.flink.engine;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;
import com.logicalclocks.hsfs.flink.engine.KafkaRecordSerializer;
import com.logicalclocks.hsfs.flink.engine.PojoToAvroRecord;
import com.logicalclocks.hsfs.metadata.DatasetApi;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.HopsworksExternalClient;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import com.twitter.chill.Base64;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import lombok.Generated;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.FileUtils;

public class FlinkEngine
extends EngineBase {
    private static FlinkEngine INSTANCE = null;
    private StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

    public static synchronized FlinkEngine getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new FlinkEngine();
        }
        return INSTANCE;
    }

    private FlinkEngine() {
        this.streamExecutionEnvironment.getConfig().enableObjectReuse();
    }

    public DataStreamSink<?> writeDataStream(StreamFeatureGroup streamFeatureGroup, DataStream<?> dataStream, Map<String, String> writeOptions) throws FeatureStoreException, IOException {
        DataStream<?> genericDataStream = dataStream;
        Properties properties = new Properties();
        properties.putAll(this.getKafkaConfig(streamFeatureGroup, writeOptions));
        KafkaSink sink = KafkaSink.builder().setBootstrapServers(properties.getProperty("bootstrap.servers")).setKafkaProducerConfig(properties).setRecordSerializer((KafkaRecordSerializationSchema)new KafkaRecordSerializer(streamFeatureGroup)).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();
        HashMap<String, String> complexFeatureSchemas = new HashMap<String, String>();
        for (String featureName : streamFeatureGroup.getComplexFeatures()) {
            complexFeatureSchemas.put(featureName, streamFeatureGroup.getFeatureAvroSchema(featureName));
        }
        SingleOutputStreamOperator avroRecordDataStream = genericDataStream.map(new PojoToAvroRecord(streamFeatureGroup.getAvroSchema(), streamFeatureGroup.getEncodedAvroSchema(), complexFeatureSchemas)).name("Mapping POJO Objects to Avro").uid(this.getUid("mapPojoAvro", streamFeatureGroup)).returns((TypeInformation)new GenericRecordAvroTypeInfo(streamFeatureGroup.getDeserializedEncodedAvroSchema()));
        return avroRecordDataStream.sinkTo((Sink)sink).name("Sink feature data to Kafka").uid(this.getUid("sink", streamFeatureGroup));
    }

    public String addFile(String filePath) throws IOException, FeatureStoreException {
        if (Strings.isNullOrEmpty((String)filePath) || filePath.startsWith("file://")) {
            return filePath;
        }
        String targetPath = FileUtils.getCurrentWorkingDirectory().toString() + filePath.substring(filePath.lastIndexOf("/"));
        if (HopsworksClient.getInstance().getHopsworksHttpClient() instanceof HopsworksExternalClient) {
            try (FileOutputStream outputStream = new FileOutputStream(targetPath);){
                outputStream.write(DatasetApi.readContent((String)filePath));
            }
            return targetPath;
        }
        if (!filePath.startsWith("hdfs://")) {
            filePath = "hdfs://" + filePath;
        }
        FileUtils.copy((Path)new Path(filePath), (Path)new Path(targetPath), (boolean)false);
        return targetPath;
    }

    public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<String, String> writeOptions) throws FeatureStoreException, IOException {
        boolean external = !System.getProperties().containsKey("hopsworks.restendpoint") && (writeOptions == null || !Boolean.parseBoolean(writeOptions.getOrDefault("internal_kafka", "false")));
        StorageConnector.KafkaConnector storageConnector = this.storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
        storageConnector.setSslTruststoreLocation(this.addFile(storageConnector.getSslTruststoreLocation()));
        storageConnector.setSslKeystoreLocation(this.addFile(storageConnector.getSslKeystoreLocation()));
        Map config = storageConnector.kafkaOptions();
        try {
            KeyStore keyStore = KeyStore.getInstance("JKS");
            keyStore.load(new FileInputStream(storageConnector.getSslKeystoreLocation()), storageConnector.getSslKeystorePassword().toCharArray());
            config.put("ssl.keystore.key", this.getKey(keyStore, storageConnector.getSslKeystorePassword()));
            config.put("ssl.keystore.certificate.chain", this.getCertificateChain(keyStore));
            config.put("ssl.keystore.type", "PEM");
            KeyStore trustStore = KeyStore.getInstance("JKS");
            trustStore.load(new FileInputStream(storageConnector.getSslTruststoreLocation()), storageConnector.getSslTruststorePassword().toCharArray());
            config.put("ssl.truststore.certificates", this.getRootCA(trustStore));
            config.put("ssl.truststore.type", "PEM");
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
        config.remove("ssl.keystore.location");
        config.remove("ssl.keystore.password");
        config.remove("ssl.truststore.location");
        config.remove("ssl.truststore.password");
        config.remove("ssl.key.password");
        if (writeOptions != null) {
            config.putAll(writeOptions);
        }
        config.put("enable.idempotence", "false");
        return config;
    }

    private String getKey(KeyStore keyStore, String password) throws KeyStoreException, UnrecoverableKeyException, NoSuchAlgorithmException {
        String keyAlias = keyStore.aliases().nextElement();
        return "-----BEGIN PRIVATE KEY-----\n" + Base64.encodeBytes((byte[])keyStore.getKey(keyAlias, password.toCharArray()).getEncoded()) + "\n-----END PRIVATE KEY-----";
    }

    private String getCertificateChain(KeyStore keyStore) throws KeyStoreException, CertificateEncodingException {
        String certificateAlias = keyStore.aliases().nextElement();
        Certificate[] certificateChain = keyStore.getCertificateChain(certificateAlias);
        StringBuilder certificateChainBuilder = new StringBuilder();
        for (Certificate certificate : certificateChain) {
            certificateChainBuilder.append("-----BEGIN CERTIFICATE-----\n").append(Base64.encodeBytes((byte[])certificate.getEncoded())).append("\n-----END CERTIFICATE-----\n");
        }
        return certificateChainBuilder.toString();
    }

    private String getRootCA(KeyStore trustStore) throws KeyStoreException, CertificateEncodingException {
        String rootCaAlias = trustStore.aliases().nextElement();
        return "-----BEGIN CERTIFICATE-----\n" + Base64.encodeBytes((byte[])trustStore.getCertificate(rootCaAlias).getEncoded()) + "\n-----END CERTIFICATE-----";
    }

    @VisibleForTesting
    public void setStorageConnectorApi(StorageConnectorApi storageConnectorApi) {
        this.storageConnectorApi = storageConnectorApi;
    }

    private String getUid(String operation, StreamFeatureGroup streamFeatureGroup) {
        return operation + "_" + streamFeatureGroup.getFeatureStore().getProjectId() + "_" + streamFeatureGroup.getId();
    }

    @Generated
    public StreamExecutionEnvironment getStreamExecutionEnvironment() {
        return this.streamExecutionEnvironment;
    }
}

