/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.serving.util;

import io.hops.hopsworks.common.dao.kafka.AclDTO;
import io.hops.hopsworks.common.dao.kafka.KafkaFacade;
import io.hops.hopsworks.common.dao.kafka.ProjectTopics;
import io.hops.hopsworks.common.dao.kafka.SchemaTopics;
import io.hops.hopsworks.common.dao.kafka.TopicDTO;
import io.hops.hopsworks.common.dao.project.Project;
import io.hops.hopsworks.common.dao.serving.Serving;
import io.hops.hopsworks.common.serving.ServingWrapper;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.exceptions.ServingException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.util.logging.Level;
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.zookeeper.KeeperException;

@Stateless
public class KafkaServingHelper {
    @EJB
    private KafkaFacade kafkaFacade;
    @EJB
    private Settings settings;
    private SchemaTopics schemaTopics = null;

    @PostConstruct
    public void init() {
        this.schemaTopics = this.kafkaFacade.getSchema("inferenceschema", 2);
    }

    public void setupKafkaServingTopic(Project project, ServingWrapper servingWrapper, Serving newDbServing, Serving oldDbServing) throws KafkaException, ProjectException, UserException, ServiceException, ServingException {
        if (servingWrapper.getKafkaTopicDTO() != null && servingWrapper.getKafkaTopicDTO().getName() != null && servingWrapper.getKafkaTopicDTO().getName().equalsIgnoreCase("NONE")) {
            newDbServing.setKafkaTopic(null);
        } else if (servingWrapper.getKafkaTopicDTO() != null && servingWrapper.getKafkaTopicDTO().getName() != null && servingWrapper.getKafkaTopicDTO().getName().equalsIgnoreCase("CREATE")) {
            ProjectTopics topic = this.setupKafkaTopic(project, servingWrapper);
            newDbServing.setKafkaTopic(topic);
        } else if (servingWrapper.getKafkaTopicDTO() != null && servingWrapper.getKafkaTopicDTO().getName() != null && !servingWrapper.getKafkaTopicDTO().getName().equalsIgnoreCase("CREATE") && !servingWrapper.getKafkaTopicDTO().getName().equalsIgnoreCase("NONE")) {
            if (oldDbServing != null && oldDbServing.getKafkaTopic() != null && oldDbServing.getKafkaTopic().getTopicName().equals(servingWrapper.getKafkaTopicDTO().getName())) {
                newDbServing.setKafkaTopic(oldDbServing.getKafkaTopic());
            } else {
                ProjectTopics topic = this.checkSchemaRequirements(project, servingWrapper);
                newDbServing.setKafkaTopic(topic);
            }
        }
    }

    public TopicDTO buildTopicDTO(Serving serving) {
        if (serving.getKafkaTopic() == null) {
            return null;
        }
        return new TopicDTO(serving.getKafkaTopic().getTopicName());
    }

    private ProjectTopics setupKafkaTopic(Project project, ServingWrapper servingWrapper) throws KafkaException, ServiceException, UserException, ProjectException {
        try {
            if (servingWrapper.getKafkaTopicDTO().getNumOfReplicas() != null && (servingWrapper.getKafkaTopicDTO().getNumOfReplicas() <= 0 || servingWrapper.getKafkaTopicDTO().getNumOfReplicas() > this.settings.getBrokerEndpoints().size())) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_REPLICATION_ERROR, Level.FINE);
            }
            if (servingWrapper.getKafkaTopicDTO().getNumOfReplicas() == null) {
                servingWrapper.getKafkaTopicDTO().setNumOfReplicas(this.settings.getKafkaDefaultNumReplicas());
            }
        }
        catch (IOException | InterruptedException | KeeperException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.BROKER_METADATA_ERROR, Level.SEVERE, "", e.getMessage(), e);
        }
        if (servingWrapper.getKafkaTopicDTO().getNumOfPartitions() != null && servingWrapper.getKafkaTopicDTO().getNumOfPartitions() <= 0) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.BAD_NUM_PARTITION, Level.FINE, "less than 0");
        }
        if (servingWrapper.getKafkaTopicDTO().getNumOfPartitions() == null) {
            servingWrapper.getKafkaTopicDTO().setNumOfPartitions(this.settings.getKafkaDefaultNumPartitions());
        }
        String servingTopicName = this.getServingTopicName(servingWrapper);
        TopicDTO topicDTO = new TopicDTO(servingTopicName, servingWrapper.getKafkaTopicDTO().getNumOfReplicas(), servingWrapper.getKafkaTopicDTO().getNumOfPartitions(), "inferenceschema", 2);
        ProjectTopics pt = null;
        pt = this.kafkaFacade.createTopicInProject(project, topicDTO);
        AclDTO aclDto = new AclDTO(project.getName(), "*", "allow", "*", "*", "*");
        this.kafkaFacade.addAclsToTopic(topicDTO.getName(), project.getId(), aclDto);
        return pt;
    }

    private ProjectTopics checkSchemaRequirements(Project project, ServingWrapper servingWrapper) throws KafkaException, ServingException {
        ProjectTopics topic = this.kafkaFacade.findTopicByNameAndProject(project, servingWrapper.getKafkaTopicDTO().getName());
        if (topic == null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "name: " + servingWrapper.getKafkaTopicDTO().getName());
        }
        if (!topic.getSchemaTopics().getSchemaTopicsPK().getName().equalsIgnoreCase("inferenceschema")) {
            throw new ServingException(RESTCodes.ServingErrorCode.BAD_TOPIC, Level.INFO, "inferenceschema required");
        }
        return topic;
    }

    private String getServingTopicName(ServingWrapper servingWrapper) {
        return servingWrapper.getServing().getName() + "-inf" + RandomStringUtils.randomNumeric((int)4);
    }
}

