/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.serving.inference.logger;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import io.hops.hopsworks.common.dao.project.Project;
import io.hops.hopsworks.common.dao.serving.Serving;
import io.hops.hopsworks.common.security.CertificateMaterializer;
import io.hops.hopsworks.common.serving.inference.logger.InferenceLogger;
import io.hops.hopsworks.common.util.HopsUtils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.CryptoPasswordNotFoundException;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.ejb.Asynchronous;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

@Stateless
public class KafkaInferenceLogger
implements InferenceLogger {
    private static final Logger LOGGER = Logger.getLogger(KafkaInferenceLogger.class.getName());
    @EJB
    private Settings settings;
    @EJB
    private CertificateMaterializer certificateMaterializer;
    public static final String SERVING_MANAGER_USERNAME = "srvmanager";
    private Properties props;

    @PostConstruct
    public void init() {
        this.props = new Properties();
        this.props.put("bootstrap.servers", this.settings.getKafkaBrokersStr());
        this.props.put("client.id", "KafkaServing");
        this.props.put("key.serializer", StringSerializer.class.getName());
        this.props.put("value.serializer", ByteArraySerializer.class.getName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Asynchronous
    public void logInferenceRequest(Serving serving, String inferenceRequest, Integer responseHttpCode, String inferenceResponse) {
        if (serving.getKafkaTopic() == null) {
            return;
        }
        KafkaProducer<String, byte[]> kafkaProducer = null;
        try {
            kafkaProducer = this.setupProducer(serving.getProject());
        }
        catch (CryptoPasswordNotFoundException | IOException e) {
            LOGGER.log(Level.FINE, "Failed to setup the produce for the project: " + serving.getProject().getName(), e);
        }
        Schema avroSchema = new Schema.Parser().parse(serving.getKafkaTopic().getSubjects().getSchema().getSchema());
        Injection recordSerializer = GenericAvroCodecs.toBinary((Schema)avroSchema);
        int schemaVersion = serving.getKafkaTopic().getSubjects().getVersion();
        GenericData.Record inferenceRecord = new GenericData.Record(avroSchema);
        this.populateInfererenceRecord(serving, inferenceRequest, responseHttpCode, inferenceResponse, inferenceRecord, schemaVersion);
        byte[] inferenceRecordBytes = (byte[])recordSerializer.apply((Object)inferenceRecord);
        ProducerRecord inferenceKakfaRecord = new ProducerRecord(serving.getKafkaTopic().getTopicName(), (Object)inferenceRecordBytes);
        try {
            kafkaProducer.send(inferenceKakfaRecord);
        }
        catch (Exception e) {
            LOGGER.log(Level.FINE, "Cannot write to topic: " + serving.getKafkaTopic().getTopicName(), e);
        }
        finally {
            if (kafkaProducer != null) {
                kafkaProducer.flush();
                kafkaProducer.close();
            }
        }
        this.certificateMaterializer.removeCertificatesLocal(SERVING_MANAGER_USERNAME, serving.getProject().getName());
    }

    private void populateInfererenceRecord(Serving serving, String inferenceRequest, Integer responseHttpCode, String inferenceResponse, GenericData.Record inferenceRecord, int schemaVersion) {
        if (schemaVersion == 1) {
            inferenceRecord.put("modelId", (Object)serving.getId());
            inferenceRecord.put("modelName", (Object)serving.getName());
            inferenceRecord.put("modelVersion", (Object)serving.getVersion());
            inferenceRecord.put("requestTimestamp", (Object)System.currentTimeMillis());
            inferenceRecord.put("responseHttpCode", (Object)responseHttpCode);
            inferenceRecord.put("inferenceRequest", (Object)inferenceRequest);
            inferenceRecord.put("inferenceResponse", (Object)inferenceResponse);
        }
        if (schemaVersion == 2) {
            inferenceRecord.put("modelId", (Object)serving.getId());
            inferenceRecord.put("modelName", (Object)serving.getName());
            inferenceRecord.put("modelVersion", (Object)serving.getVersion());
            inferenceRecord.put("requestTimestamp", (Object)System.currentTimeMillis());
            inferenceRecord.put("responseHttpCode", (Object)responseHttpCode);
            inferenceRecord.put("inferenceRequest", (Object)inferenceRequest);
            inferenceRecord.put("inferenceResponse", (Object)inferenceResponse);
            inferenceRecord.put("servingType", (Object)serving.getServingType().name());
        }
    }

    private KafkaProducer<String, byte[]> setupProducer(Project project) throws IOException, CryptoPasswordNotFoundException {
        this.certificateMaterializer.materializeCertificatesLocal(SERVING_MANAGER_USERNAME, project.getName());
        CertificateMaterializer.CryptoMaterial cryptoMaterial = this.certificateMaterializer.getUserMaterial(SERVING_MANAGER_USERNAME, project.getName());
        this.props.setProperty("security.protocol", "SSL");
        this.props.setProperty("ssl.endpoint.identification.algorithm", "");
        this.props.setProperty("ssl.truststore.location", this.settings.getHopsworksTmpCertDir() + File.separator + HopsUtils.getProjectTruststoreName(project.getName(), SERVING_MANAGER_USERNAME));
        this.props.setProperty("ssl.truststore.password", String.valueOf(cryptoMaterial.getPassword()));
        this.props.setProperty("ssl.keystore.location", this.settings.getHopsworksTmpCertDir() + File.separator + HopsUtils.getProjectKeystoreName(project.getName(), SERVING_MANAGER_USERNAME));
        this.props.setProperty("ssl.keystore.password", String.valueOf(cryptoMaterial.getPassword()));
        this.props.setProperty("ssl.key.password", String.valueOf(cryptoMaterial.getPassword()));
        return new KafkaProducer(this.props);
    }

    @Override
    public String getClassName() {
        return KafkaInferenceLogger.class.getName();
    }
}

