/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.beam.engine;

import com.google.common.base.Strings;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.beam.StreamFeatureGroup;
import com.logicalclocks.hsfs.beam.engine.BeamProducer;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.metadata.DatasetApi;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;

public class BeamEngine
extends EngineBase {
    private static BeamEngine INSTANCE = null;

    public static synchronized BeamEngine getInstance() throws FeatureStoreException {
        if (INSTANCE == null) {
            INSTANCE = new BeamEngine();
        }
        return INSTANCE;
    }

    private BeamEngine() throws FeatureStoreException {
    }

    public BeamProducer insertStream(StreamFeatureGroup streamFeatureGroup, Map<String, String> writeOptions) throws FeatureStoreException, IOException {
        HashMap<String, Schema> complexFeatureSchemas = new HashMap<String, Schema>();
        for (String featureName : streamFeatureGroup.getComplexFeatures()) {
            complexFeatureSchemas.put(featureName, new Schema.Parser().parse(streamFeatureGroup.getFeatureAvroSchema(featureName)));
        }
        Schema deserializedEncodedSchema = new Schema.Parser().parse(streamFeatureGroup.getEncodedAvroSchema());
        return new BeamProducer(streamFeatureGroup.getOnlineTopicName(), this.getKafkaConfig(streamFeatureGroup, writeOptions), streamFeatureGroup.getDeserializedAvroSchema(), deserializedEncodedSchema, complexFeatureSchemas, streamFeatureGroup.getPrimaryKeys(), streamFeatureGroup);
    }

    public String addFile(String filePath) throws IOException, FeatureStoreException {
        if (Strings.isNullOrEmpty((String)filePath)) {
            return filePath;
        }
        if (!filePath.startsWith("file://")) {
            filePath = "hdfs://" + filePath;
        }
        String targetPath = System.getProperty("java.io.tmpdir") + filePath.substring(filePath.lastIndexOf("/"));
        try (FileOutputStream outputStream = new FileOutputStream(targetPath);){
            outputStream.write(DatasetApi.readContent((Integer)HopsworksClient.getInstance().getProject().getProjectId(), (String)filePath, (String)"HIVEDB"));
        }
        return targetPath;
    }

    public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<String, String> writeOptions) throws FeatureStoreException, IOException {
        boolean external = !System.getProperties().containsKey("hopsworks.restendpoint") && (writeOptions == null || !Boolean.parseBoolean(writeOptions.getOrDefault("internal_kafka", "false")));
        StorageConnector.KafkaConnector storageConnector = this.storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
        storageConnector.setSslTruststoreLocation(this.addFile(storageConnector.getSslTruststoreLocation()));
        storageConnector.setSslKeystoreLocation(this.addFile(storageConnector.getSslKeystoreLocation()));
        Map config = storageConnector.kafkaOptions();
        if (writeOptions != null) {
            config.putAll(writeOptions);
        }
        return config;
    }
}

