/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.serving.util;

import io.hops.hopsworks.common.dao.kafka.AclDTO;
import io.hops.hopsworks.common.dao.kafka.ProjectTopics;
import io.hops.hopsworks.common.dao.kafka.ProjectTopicsFacade;
import io.hops.hopsworks.common.dao.kafka.TopicDTO;
import io.hops.hopsworks.common.dao.project.Project;
import io.hops.hopsworks.common.dao.project.team.ProjectTeam;
import io.hops.hopsworks.common.dao.serving.Serving;
import io.hops.hopsworks.common.kafka.KafkaController;
import io.hops.hopsworks.common.serving.ServingWrapper;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServingException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.zookeeper.KeeperException;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class KafkaServingHelper {
    @EJB
    private Settings settings;
    @EJB
    private KafkaController kafkaController;
    @EJB
    private ProjectTopicsFacade projectTopicsFacade;

    public void setupKafkaServingTopic(Project project, ServingWrapper servingWrapper, Serving newDbServing, Serving oldDbServing) throws KafkaException, ProjectException, UserException, ServingException, InterruptedException, ExecutionException {
        if (servingWrapper.getKafkaTopicDTO() != null && servingWrapper.getKafkaTopicDTO().getName() != null && servingWrapper.getKafkaTopicDTO().getName().equalsIgnoreCase("NONE")) {
            newDbServing.setKafkaTopic(null);
        } else if (servingWrapper.getKafkaTopicDTO() != null && servingWrapper.getKafkaTopicDTO().getName() != null && servingWrapper.getKafkaTopicDTO().getName().equalsIgnoreCase("CREATE")) {
            ProjectTopics topic = this.setupKafkaTopic(project, servingWrapper);
            newDbServing.setKafkaTopic(topic);
        } else if (servingWrapper.getKafkaTopicDTO() != null && servingWrapper.getKafkaTopicDTO().getName() != null && !servingWrapper.getKafkaTopicDTO().getName().equalsIgnoreCase("CREATE") && !servingWrapper.getKafkaTopicDTO().getName().equalsIgnoreCase("NONE")) {
            if (oldDbServing != null && oldDbServing.getKafkaTopic() != null && oldDbServing.getKafkaTopic().getTopicName().equals(servingWrapper.getKafkaTopicDTO().getName())) {
                newDbServing.setKafkaTopic(oldDbServing.getKafkaTopic());
            } else {
                ProjectTopics topic = this.checkSchemaRequirements(project, servingWrapper);
                newDbServing.setKafkaTopic(topic);
            }
        }
    }

    public TopicDTO buildTopicDTO(Serving serving) {
        if (serving.getKafkaTopic() == null) {
            return null;
        }
        ProjectTopics topic = serving.getKafkaTopic();
        return new TopicDTO(topic.getTopicName(), topic.getSubjects().getSubject(), topic.getSubjects().getVersion(), topic.getSubjects().getSchema().getSchema());
    }

    private ProjectTopics setupKafkaTopic(Project project, ServingWrapper servingWrapper) throws KafkaException, UserException, ProjectException, InterruptedException, ExecutionException {
        try {
            if (servingWrapper.getKafkaTopicDTO().getNumOfReplicas() != null && (servingWrapper.getKafkaTopicDTO().getNumOfReplicas() <= 0 || servingWrapper.getKafkaTopicDTO().getNumOfReplicas() > this.settings.getBrokerEndpoints().size())) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_REPLICATION_ERROR, Level.FINE);
            }
            if (servingWrapper.getKafkaTopicDTO().getNumOfReplicas() == null) {
                servingWrapper.getKafkaTopicDTO().setNumOfReplicas(this.settings.getKafkaDefaultNumReplicas());
            }
        }
        catch (IOException | InterruptedException | KeeperException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.BROKER_METADATA_ERROR, Level.SEVERE, "", e.getMessage(), e);
        }
        if (servingWrapper.getKafkaTopicDTO().getNumOfPartitions() != null && servingWrapper.getKafkaTopicDTO().getNumOfPartitions() <= 0) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.BAD_NUM_PARTITION, Level.FINE, "less than 0");
        }
        if (servingWrapper.getKafkaTopicDTO().getNumOfPartitions() == null) {
            servingWrapper.getKafkaTopicDTO().setNumOfPartitions(this.settings.getKafkaDefaultNumPartitions());
        }
        String servingTopicName = this.getServingTopicName(servingWrapper);
        TopicDTO topicDTO = new TopicDTO(servingTopicName, servingWrapper.getKafkaTopicDTO().getNumOfReplicas(), servingWrapper.getKafkaTopicDTO().getNumOfPartitions(), "inferenceschema", 2);
        ProjectTopics pt = this.kafkaController.createTopicInProject(project, topicDTO);
        for (ProjectTeam projectTeam : project.getProjectTeamCollection()) {
            AclDTO aclDto = new AclDTO(project.getName(), projectTeam.getUser().getEmail(), "allow", "*", "*", "*");
            this.kafkaController.addAclsToTopic(topicDTO.getName(), project.getId(), aclDto);
        }
        return pt;
    }

    private ProjectTopics checkSchemaRequirements(Project project, ServingWrapper servingWrapper) throws KafkaException, ServingException {
        ProjectTopics topic = this.projectTopicsFacade.findTopicByNameAndProject(project, servingWrapper.getKafkaTopicDTO().getName()).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "name: " + servingWrapper.getKafkaTopicDTO().getName()));
        if (!topic.getSubjects().getSubject().equalsIgnoreCase("inferenceschema")) {
            throw new ServingException(RESTCodes.ServingErrorCode.BAD_TOPIC, Level.INFO, "inferenceschema required");
        }
        return topic;
    }

    private String getServingTopicName(ServingWrapper servingWrapper) {
        return servingWrapper.getServing().getName() + "-inf" + RandomStringUtils.randomNumeric((int)4);
    }
}

