package com.logicalclocks.hsfs.flink.engine;

import com.google.common.base.Strings;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;
import com.logicalclocks.hsfs.metadata.HopsworksInternalClient;
import com.logicalclocks.hsfs.util.Constants;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.FileUtils;

/* loaded from: input_file:com/logicalclocks/hsfs/flink/engine/FlinkEngine.class */
public class FlinkEngine extends EngineBase {
    private static FlinkEngine INSTANCE = null;
    private final Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
    private final ConfigOption<String> keyStorePath = ConfigOptions.key("flink.hadoop.hops.ssl.keystore.name").stringType().defaultValue("trustStore.jks").withDescription("path to keyStore.jks");
    private final ConfigOption<String> trustStorePath = ConfigOptions.key("flink.hadoop.hops.ssl.truststore.name").stringType().defaultValue("trustStore.jks").withDescription("path to trustStore.jks");
    private final ConfigOption<String> materialPasswdPath = ConfigOptions.key("flink.hadoop.hops.ssl.keystores.passwd.name").stringType().defaultValue("material_passwd").withDescription("path to material_passwd");
    private StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

    public static synchronized FlinkEngine getInstance() throws FeatureStoreException {
        if (INSTANCE == null) {
            INSTANCE = new FlinkEngine();
        }
        return INSTANCE;
    }

    private FlinkEngine() throws FeatureStoreException {
        this.streamExecutionEnvironment.getConfig().enableObjectReuse();
    }

    public DataStreamSink<?> writeDataStream(StreamFeatureGroup streamFeatureGroup, DataStream<?> dataStream, Map<String, String> map) throws FeatureStoreException, IOException {
        Properties properties = new Properties();
        properties.putAll(getKafkaConfig(streamFeatureGroup, map));
        KafkaSink build = KafkaSink.builder().setBootstrapServers(properties.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS)).setKafkaProducerConfig(properties).setRecordSerializer(new KafkaRecordSerializer(streamFeatureGroup)).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();
        HashMap hashMap = new HashMap();
        for (String str : streamFeatureGroup.getComplexFeatures()) {
            hashMap.put(str, streamFeatureGroup.getFeatureAvroSchema(str));
        }
        return dataStream.map(new PojoToAvroRecord(streamFeatureGroup.getDeserializedAvroSchema(), streamFeatureGroup.getDeserializedEncodedAvroSchema(), hashMap)).returns(new GenericRecordAvroTypeInfo(streamFeatureGroup.getDeserializedEncodedAvroSchema())).sinkTo(build);
    }

    @Override // com.logicalclocks.hsfs.engine.EngineBase
    public String addFile(String str) throws IOException {
        if (Strings.isNullOrEmpty(str)) {
            return str;
        }
        if (!str.startsWith("file://")) {
            str = "hdfs://" + str;
        }
        String str2 = FileUtils.getCurrentWorkingDirectory().toString() + str.substring(str.lastIndexOf("/"));
        FileUtils.copy(new Path(str), new Path(str2), false);
        return str2;
    }

    @Override // com.logicalclocks.hsfs.engine.EngineBase
    public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroupBase, Map<String, String> map) throws FeatureStoreException, IOException {
        StorageConnector.KafkaConnector kafkaStorageConnector = this.storageConnectorApi.getKafkaStorageConnector(featureGroupBase.getFeatureStore(), !System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS) && (map == null || !Boolean.parseBoolean(map.getOrDefault("internal_kafka", "false"))));
        kafkaStorageConnector.setSslTruststoreLocation(addFile(kafkaStorageConnector.getSslTruststoreLocation()));
        kafkaStorageConnector.setSslKeystoreLocation(addFile(kafkaStorageConnector.getSslKeystoreLocation()));
        Map<String, String> kafkaOptions = kafkaStorageConnector.kafkaOptions();
        if (map != null) {
            kafkaOptions.putAll(map);
        }
        return kafkaOptions;
    }

    public String getTrustStorePath() {
        return this.flinkConfig.getString(this.trustStorePath);
    }

    public String getKeyStorePath() {
        return this.flinkConfig.getString(this.keyStorePath);
    }

    public String getCertKey() {
        return this.flinkConfig.getString(this.materialPasswdPath);
    }

    public StreamExecutionEnvironment getStreamExecutionEnvironment() {
        return this.streamExecutionEnvironment;
    }
}
