package io.hops.hopsworks.common.serving.inference.logger;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import io.hops.hopsworks.common.dao.kafka.KafkaFacade;
import io.hops.hopsworks.common.dao.project.Project;
import io.hops.hopsworks.common.dao.serving.TfServing;
import io.hops.hopsworks.common.exception.CryptoPasswordNotFoundException;
import io.hops.hopsworks.common.security.CertificateMaterializer;
import io.hops.hopsworks.common.util.HopsUtils;
import io.hops.hopsworks.common.util.Settings;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.ejb.Asynchronous;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/serving/inference/logger/KafkaInferenceLogger.class */
public class KafkaInferenceLogger implements InferenceLogger {
    private static final Logger LOGGER = Logger.getLogger(KafkaInferenceLogger.class.getName());

    @EJB
    private Settings settings;

    @EJB
    private CertificateMaterializer certificateMaterializer;
    public static final String SERVING_MANAGER_USERNAME = "srvmanager";
    private KafkaProducer<String, byte[]> kafkaProducer;
    private Schema schema;
    private Injection<GenericRecord, byte[]> recordSerializer;
    private Properties props;

    @PostConstruct
    public void init() {
        this.schema = ReflectData.get().getSchema(KafkaInferenceLog.class);
        this.recordSerializer = GenericAvroCodecs.toBinary(this.schema);
        this.props = new Properties();
        this.props.put("bootstrap.servers", this.settings.getKafkaBrokersStr());
        this.props.put("client.id", "KafkaServing");
        this.props.put("key.serializer", StringSerializer.class.getName());
        this.props.put("value.serializer", ByteArraySerializer.class.getName());
    }

    @Override // io.hops.hopsworks.common.serving.inference.logger.InferenceLogger
    @Asynchronous
    public void logInferenceRequest(TfServing tfServing, String str, Integer num, String str2) {
        if (tfServing.getKafkaTopic() == null) {
            return;
        }
        try {
            setupProducer(tfServing.getProject());
        } catch (CryptoPasswordNotFoundException | IOException e) {
            LOGGER.log(Level.FINE, "Failed to setup the produce for the project: " + tfServing.getProject().getName(), e);
        }
        GenericData.Record record = new GenericData.Record(this.schema);
        record.put("modelId", tfServing.getId());
        record.put("modelName", tfServing.getModelName());
        record.put("modelVersion", tfServing.getVersion());
        record.put("requestTimestamp", Long.valueOf(System.currentTimeMillis()));
        record.put("responseHttpCode", num);
        record.put("inferenceRequest", str);
        record.put("inferenceResponse", str2);
        try {
            this.kafkaProducer.send(new ProducerRecord(tfServing.getKafkaTopic().getTopicName(), (byte[]) this.recordSerializer.apply(record)));
        } catch (Exception e2) {
            LOGGER.log(Level.FINE, "Cannot write to topic: " + tfServing.getKafkaTopic().getTopicName(), (Throwable) e2);
        }
        this.certificateMaterializer.removeCertificatesLocal(SERVING_MANAGER_USERNAME, tfServing.getProject().getName());
    }

    private void setupProducer(Project project) throws IOException, CryptoPasswordNotFoundException {
        this.certificateMaterializer.materializeCertificatesLocal(SERVING_MANAGER_USERNAME, project.getName());
        CertificateMaterializer.CryptoMaterial userMaterial = this.certificateMaterializer.getUserMaterial(SERVING_MANAGER_USERNAME, project.getName());
        this.props.setProperty("security.protocol", KafkaFacade.KAFKA_SECURITY_PROTOCOL);
        this.props.setProperty("ssl.truststore.location", this.settings.getHopsworksTmpCertDir() + File.separator + HopsUtils.getProjectTruststoreName(project.getName(), SERVING_MANAGER_USERNAME));
        this.props.setProperty("ssl.truststore.password", String.valueOf(userMaterial.getPassword()));
        this.props.setProperty("ssl.keystore.location", this.settings.getHopsworksTmpCertDir() + File.separator + HopsUtils.getProjectKeystoreName(project.getName(), SERVING_MANAGER_USERNAME));
        this.props.setProperty("ssl.keystore.password", String.valueOf(userMaterial.getPassword()));
        this.props.setProperty("ssl.key.password", String.valueOf(userMaterial.getPassword()));
        this.kafkaProducer = new KafkaProducer<>(this.props);
    }

    @Override // io.hops.hopsworks.common.serving.inference.logger.InferenceLogger
    public String getClassName() {
        return KafkaInferenceLogger.class.getName();
    }
}
