/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.serving.inference.logger;

import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.security.CertificateMaterializer;
import io.hops.hopsworks.common.serving.inference.logger.InferenceLogger;
import io.hops.hopsworks.common.util.HopsUtils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.CryptoPasswordNotFoundException;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.serving.Serving;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.Asynchronous;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

@Stateless
public class KafkaInferenceLogger
implements InferenceLogger {
    private static final Logger LOGGER = Logger.getLogger(KafkaInferenceLogger.class.getName());
    @EJB
    private Settings settings;
    @EJB
    private CertificateMaterializer certificateMaterializer;
    @EJB
    private HopsKafkaAdminClient hopsKafkaAdminClient;
    public static final String SERVING_MANAGER_USERNAME = "srvmanager";

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Asynchronous
    public void logInferenceRequest(Serving serving, String inferenceRequest, Integer responseHttpCode, String inferenceResponse) {
        if (serving.getKafkaTopic() == null) {
            return;
        }
        KafkaProducer<String, byte[]> kafkaProducer = null;
        try {
            kafkaProducer = this.setupProducer(serving.getProject());
        }
        catch (CryptoPasswordNotFoundException | IOException e) {
            LOGGER.log(Level.FINE, "Failed to setup the produce for the project: " + serving.getProject().getName(), e);
        }
        Schema avroSchema = new Schema.Parser().parse(serving.getKafkaTopic().getSubjects().getSchema().getSchema());
        int schemaVersion = serving.getKafkaTopic().getSubjects().getVersion();
        GenericData.Record inferenceRecord = new GenericData.Record(avroSchema);
        this.populateInfererenceRecord(serving, inferenceRequest, responseHttpCode, inferenceResponse, inferenceRecord, schemaVersion);
        try (ByteArrayOutputStream out = new ByteArrayOutputStream();){
            GenericDatumWriter writer = new GenericDatumWriter(avroSchema);
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder((OutputStream)out, null);
            writer.write((Object)inferenceRecord, (Encoder)encoder);
            encoder.flush();
            ProducerRecord inferenceKakfaRecord = new ProducerRecord(serving.getKafkaTopic().getTopicName(), (Object)out.toByteArray());
            kafkaProducer.send(inferenceKakfaRecord);
        }
        catch (Exception e) {
            LOGGER.log(Level.FINE, "Cannot write to topic: " + serving.getKafkaTopic().getTopicName(), e);
        }
        finally {
            if (kafkaProducer != null) {
                kafkaProducer.flush();
                kafkaProducer.close();
            }
        }
        this.certificateMaterializer.removeCertificatesLocal(SERVING_MANAGER_USERNAME, serving.getProject().getName());
    }

    private void populateInfererenceRecord(Serving serving, String inferenceRequest, Integer responseHttpCode, String inferenceResponse, GenericData.Record inferenceRecord, int schemaVersion) {
        if (schemaVersion <= 3) {
            inferenceRecord.put("modelId", (Object)serving.getId());
            inferenceRecord.put("modelName", (Object)serving.getName());
            inferenceRecord.put("modelVersion", (Object)serving.getModelVersion());
            inferenceRecord.put("requestTimestamp", (Object)System.currentTimeMillis());
            inferenceRecord.put("responseHttpCode", (Object)responseHttpCode);
            inferenceRecord.put("inferenceRequest", (Object)inferenceRequest);
            inferenceRecord.put("inferenceResponse", (Object)inferenceResponse);
        }
        if (schemaVersion == 2) {
            inferenceRecord.put("servingType", (Object)serving.getModelServer().name());
        }
        if (schemaVersion == 3) {
            inferenceRecord.put("modelServer", (Object)serving.getModelServer().name());
            inferenceRecord.put("servingTool", (Object)serving.getServingTool().name());
        }
    }

    private KafkaProducer<String, byte[]> setupProducer(Project project) throws IOException, CryptoPasswordNotFoundException {
        Properties props = this.hopsKafkaAdminClient.getHopsworksKafkaProperties();
        props.put("client.id", "KafkaServing");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", ByteArraySerializer.class.getName());
        this.certificateMaterializer.materializeCertificatesLocal(SERVING_MANAGER_USERNAME, project.getName());
        CertificateMaterializer.CryptoMaterial cryptoMaterial = this.certificateMaterializer.getUserMaterial(SERVING_MANAGER_USERNAME, project.getName());
        props.setProperty("ssl.truststore.location", this.settings.getHopsworksTmpCertDir() + File.separator + HopsUtils.getProjectTruststoreName(project.getName(), SERVING_MANAGER_USERNAME));
        props.setProperty("ssl.truststore.password", String.valueOf(cryptoMaterial.getPassword()));
        props.setProperty("ssl.keystore.location", this.settings.getHopsworksTmpCertDir() + File.separator + HopsUtils.getProjectKeystoreName(project.getName(), SERVING_MANAGER_USERNAME));
        props.setProperty("ssl.keystore.password", String.valueOf(cryptoMaterial.getPassword()));
        props.setProperty("ssl.key.password", String.valueOf(cryptoMaterial.getPassword()));
        return new KafkaProducer(props);
    }

    @Override
    public String getClassName() {
        return KafkaInferenceLogger.class.getName();
    }
}

