/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.flink.engine;

import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;
import com.logicalclocks.hsfs.flink.engine.GenericRecordAvroSerializer;
import com.logicalclocks.hsfs.flink.engine.KeySerializationSchema;
import com.logicalclocks.hsfs.flink.engine.PojoToAvroRecord;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
import com.logicalclocks.hsfs.metadata.KafkaApi;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkEngine {
    private static FlinkEngine INSTANCE = null;
    private StreamExecutionEnvironment streamExecutionEnvironment;
    private final KafkaApi kafkaApi = new KafkaApi();
    private final HopsworksHttpClient client = HopsworksClient.getInstance().getHopsworksHttpClient();

    public static synchronized FlinkEngine getInstance() throws FeatureStoreException {
        if (INSTANCE == null) {
            INSTANCE = new FlinkEngine();
        }
        return INSTANCE;
    }

    private FlinkEngine() throws FeatureStoreException {
        this.streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        this.streamExecutionEnvironment.getConfig().enableObjectReuse();
    }

    public DataStreamSink<?> writeDataStream(StreamFeatureGroup streamFeatureGroup, DataStream<?> dataStream, Map<String, String> writeOptions) throws FeatureStoreException, IOException {
        DataStream<?> genericDataStream = dataStream;
        Properties properties = this.getKafkaProperties(streamFeatureGroup, writeOptions);
        KafkaSink sink = KafkaSink.builder().setBootstrapServers(properties.getProperty("bootstrap.servers")).setKafkaProducerConfig(properties).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(streamFeatureGroup.getOnlineTopicName()).setKeySerializationSchema((SerializationSchema)new KeySerializationSchema(streamFeatureGroup.getPrimaryKeys())).setValueSerializationSchema((SerializationSchema)new GenericRecordAvroSerializer()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();
        HashMap<String, String> complexFeatureSchemas = new HashMap<String, String>();
        for (String featureName : streamFeatureGroup.getComplexFeatures()) {
            complexFeatureSchemas.put(featureName, streamFeatureGroup.getFeatureAvroSchema(featureName));
        }
        SingleOutputStreamOperator avroRecordDataStream = genericDataStream.map(new PojoToAvroRecord(streamFeatureGroup.getDeserializedAvroSchema(), streamFeatureGroup.getDeserializedEncodedAvroSchema(), complexFeatureSchemas)).returns((TypeInformation)new GenericRecordAvroTypeInfo(streamFeatureGroup.getDeserializedEncodedAvroSchema()));
        return avroRecordDataStream.sinkTo((Sink)sink);
    }

    private Properties getKafkaProperties(StreamFeatureGroup featureGroup, Map<String, String> writeOptions) throws FeatureStoreException, IOException {
        Properties properties = new Properties();
        boolean internalKafka = false;
        if (writeOptions != null) {
            internalKafka = Boolean.parseBoolean(writeOptions.getOrDefault("internal_kafka", "false"));
            properties.putAll(writeOptions);
        }
        if (System.getProperties().containsKey("hopsworks.restendpoint") || internalKafka) {
            properties.put("bootstrap.servers", this.kafkaApi.getBrokerEndpoints(featureGroup.getFeatureStore()).stream().map(broker -> broker.replaceAll("INTERNAL://", "")).collect(Collectors.joining(",")));
        } else {
            properties.put("bootstrap.servers", this.kafkaApi.getBrokerEndpoints(featureGroup.getFeatureStore(), true).stream().map(broker -> broker.replaceAll("EXTERNAL://", "")).collect(Collectors.joining(",")));
        }
        properties.put("security.protocol", "SSL");
        properties.put("ssl.truststore.location", this.client.getTrustStorePath());
        properties.put("ssl.truststore.password", this.client.getCertKey());
        properties.put("ssl.keystore.location", this.client.getKeyStorePath());
        properties.put("ssl.keystore.password", this.client.getCertKey());
        properties.put("ssl.key.password", this.client.getCertKey());
        properties.put("ssl.endpoint.identification.algorithm", "");
        return properties;
    }

    public StreamExecutionEnvironment getStreamExecutionEnvironment() {
        return this.streamExecutionEnvironment;
    }
}

