package io.hops.hopsworks.common.serving.inference.logger;

import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.security.CertificateMaterializer;
import io.hops.hopsworks.common.util.HopsUtils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.CryptoPasswordNotFoundException;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.serving.Serving;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.Asynchronous;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/serving/inference/logger/KafkaInferenceLogger.class */
public class KafkaInferenceLogger implements InferenceLogger {
    private static final Logger LOGGER = Logger.getLogger(KafkaInferenceLogger.class.getName());

    @EJB
    private Settings settings;

    @EJB
    private CertificateMaterializer certificateMaterializer;

    @EJB
    private HopsKafkaAdminClient hopsKafkaAdminClient;
    public static final String SERVING_MANAGER_USERNAME = "srvmanager";

    @Override // io.hops.hopsworks.common.serving.inference.logger.InferenceLogger
    @Asynchronous
    public void logInferenceRequest(Serving serving, String str, Integer num, String str2) {
        ByteArrayOutputStream byteArrayOutputStream;
        Throwable th;
        if (serving.getKafkaTopic() == null) {
            return;
        }
        KafkaProducer<String, byte[]> kafkaProducer = null;
        try {
            kafkaProducer = setupProducer(serving.getProject());
        } catch (IOException | CryptoPasswordNotFoundException e) {
            LOGGER.log(Level.FINE, "Failed to setup the produce for the project: " + serving.getProject().getName(), (Throwable) e);
        }
        Schema parse = new Schema.Parser().parse(serving.getKafkaTopic().getSubjects().getSchema().getSchema());
        int intValue = serving.getKafkaTopic().getSubjects().getVersion().intValue();
        GenericData.Record record = new GenericData.Record(parse);
        populateInfererenceRecord(serving, str, num, str2, record, intValue);
        try {
            try {
                byteArrayOutputStream = new ByteArrayOutputStream();
                th = null;
            } catch (Exception e2) {
                LOGGER.log(Level.FINE, "Cannot write to topic: " + serving.getKafkaTopic().getTopicName(), (Throwable) e2);
                if (kafkaProducer != null) {
                    kafkaProducer.flush();
                    kafkaProducer.close();
                }
            }
            try {
                try {
                    GenericDatumWriter genericDatumWriter = new GenericDatumWriter(parse);
                    BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, (BinaryEncoder) null);
                    genericDatumWriter.write(record, binaryEncoder);
                    binaryEncoder.flush();
                    kafkaProducer.send(new ProducerRecord(serving.getKafkaTopic().getTopicName(), byteArrayOutputStream.toByteArray()));
                    if (byteArrayOutputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteArrayOutputStream.close();
                        }
                    }
                    if (kafkaProducer != null) {
                        kafkaProducer.flush();
                        kafkaProducer.close();
                    }
                    this.certificateMaterializer.removeCertificatesLocal(SERVING_MANAGER_USERNAME, serving.getProject().getName());
                } finally {
                }
            } catch (Throwable th3) {
                if (byteArrayOutputStream != null) {
                    if (th != null) {
                        try {
                            byteArrayOutputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        byteArrayOutputStream.close();
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (kafkaProducer != null) {
                kafkaProducer.flush();
                kafkaProducer.close();
            }
            throw th5;
        }
    }

    private void populateInfererenceRecord(Serving serving, String str, Integer num, String str2, GenericData.Record record, int i) {
        if (i <= 3) {
            record.put("modelId", serving.getId());
            record.put("modelName", serving.getName());
            record.put("modelVersion", serving.getModelVersion());
            record.put("requestTimestamp", Long.valueOf(System.currentTimeMillis()));
            record.put("responseHttpCode", num);
            record.put("inferenceRequest", str);
            record.put("inferenceResponse", str2);
        }
        if (i == 2) {
            record.put("servingType", serving.getModelServer().name());
        }
        if (i == 3) {
            record.put("modelServer", serving.getModelServer().name());
            record.put("servingTool", serving.getServingTool().name());
        }
    }

    private KafkaProducer<String, byte[]> setupProducer(Project project) throws IOException, CryptoPasswordNotFoundException {
        Properties hopsworksKafkaProperties = this.hopsKafkaAdminClient.getHopsworksKafkaProperties();
        hopsworksKafkaProperties.put("client.id", "KafkaServing");
        hopsworksKafkaProperties.put("key.serializer", StringSerializer.class.getName());
        hopsworksKafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
        this.certificateMaterializer.materializeCertificatesLocal(SERVING_MANAGER_USERNAME, project.getName());
        CertificateMaterializer.CryptoMaterial userMaterial = this.certificateMaterializer.getUserMaterial(SERVING_MANAGER_USERNAME, project.getName());
        hopsworksKafkaProperties.setProperty("ssl.truststore.location", this.settings.getHopsworksTmpCertDir() + File.separator + HopsUtils.getProjectTruststoreName(project.getName(), SERVING_MANAGER_USERNAME));
        hopsworksKafkaProperties.setProperty("ssl.truststore.password", String.valueOf(userMaterial.getPassword()));
        hopsworksKafkaProperties.setProperty("ssl.keystore.location", this.settings.getHopsworksTmpCertDir() + File.separator + HopsUtils.getProjectKeystoreName(project.getName(), SERVING_MANAGER_USERNAME));
        hopsworksKafkaProperties.setProperty("ssl.keystore.password", String.valueOf(userMaterial.getPassword()));
        hopsworksKafkaProperties.setProperty("ssl.key.password", String.valueOf(userMaterial.getPassword()));
        return new KafkaProducer<>(hopsworksKafkaProperties);
    }

    @Override // io.hops.hopsworks.common.serving.inference.logger.InferenceLogger
    public String getClassName() {
        return KafkaInferenceLogger.class.getName();
    }
}
