/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.kafka;

import io.hops.hopsworks.common.dao.dataset.DatasetSharedWithFacade;
import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.dao.kafka.PartitionDetailsDTO;
import io.hops.hopsworks.common.dao.kafka.ProjectTopicsFacade;
import io.hops.hopsworks.common.dao.kafka.TopicDTO;
import io.hops.hopsworks.common.dao.kafka.TopicDefaultValueDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectsFacade;
import io.hops.hopsworks.common.dao.project.ProjectFacade;
import io.hops.hopsworks.common.dao.user.UserFacade;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.common.project.ProjectController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.persistence.entity.dataset.DatasetSharedWith;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
import io.hops.hopsworks.persistence.entity.kafka.schemas.Subjects;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.zookeeper.KeeperException;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
public class KafkaController {
    private static final Logger LOGGER = Logger.getLogger(KafkaController.class.getName());
    @EJB
    private Settings settings;
    @EJB
    private ProjectFacade projectFacade;
    @EJB
    private UserFacade userFacade;
    @EJB
    private ProjectTopicsFacade projectTopicsFacade;
    @EJB
    private HopsKafkaAdminClient hopsKafkaAdminClient;
    @EJB
    private SubjectsFacade subjectsFacade;
    @EJB
    private ProjectController projectController;
    @EJB
    private KafkaBrokers kafkaBrokers;
    @EJB
    private DatasetSharedWithFacade datasetSharedWithFacade;

    public void createTopic(Project project, TopicDTO topicDto) throws KafkaException {
        if (topicDto == null) {
            throw new IllegalArgumentException("topicDto was not provided.");
        }
        String topicName = topicDto.getName();
        if (this.projectTopicsFacade.findTopicByName(topicDto.getName()).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS, Level.FINE, "topic name: " + topicName);
        }
        if (this.projectTopicsFacade.findTopicsByProject(project).size() > project.getKafkaMaxNumTopics()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_LIMIT_REACHED, Level.FINE, "topic name: " + topicName + ", project: " + project.getName());
        }
        try {
            Set<String> brokerEndpoints = this.kafkaBrokers.getBrokerEndpoints();
            if (brokerEndpoints.size() < topicDto.getNumOfReplicas()) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_REPLICATION_ERROR, Level.FINE, "maximum: " + brokerEndpoints.size());
            }
        }
        catch (IOException | InterruptedException | KeeperException ex) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.KAFKA_GENERIC_ERROR, Level.SEVERE, "project: " + project.getName(), ex.getMessage(), ex);
        }
        this.createTopicInProject(project, topicDto);
    }

    public void removeTopicFromProject(Project project, String topicName) throws KafkaException {
        ProjectTopics pt = this.projectTopicsFacade.findTopicByNameAndProject(project, topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName));
        this.projectTopicsFacade.remove(pt);
        this.hopsKafkaAdminClient.deleteTopics(Collections.singleton(pt.getTopicName()));
    }

    public boolean projectTopicExists(Project project, String topicName) {
        return this.projectTopicsFacade.findTopicByNameAndProject(project, topicName).isPresent();
    }

    public List<TopicDTO> findTopicsByProject(Project project) {
        List<ProjectTopics> ptList = this.projectTopicsFacade.findTopicsByProject(project);
        ArrayList<TopicDTO> topics = new ArrayList<TopicDTO>();
        for (ProjectTopics pt : ptList) {
            topics.add(new TopicDTO(pt.getTopicName(), pt.getSubjects().getSubject(), pt.getSubjects().getVersion(), false));
        }
        return topics;
    }

    public List<TopicDTO> findAllTopicsByProject(Project project) {
        return this.findTopicsByProject(project);
    }

    public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) throws KafkaException {
        Subjects schema = this.subjectsFacade.findSubjectByNameAndVersion(project, topicDto.getSchemaName(), topicDto.getSchemaVersion()).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_NOT_FOUND, Level.FINE, "topic: " + topicDto.getName()));
        try {
            if (this.createTopicInKafka(topicDto).get(3000L, TimeUnit.MILLISECONDS) == null) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS_IN_ZOOKEEPER, Level.INFO, "topic name: " + topicDto.getName());
            }
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_CREATION_FAILED, Level.WARNING, "Topic name: " + topicDto.getName(), e.getMessage(), (Throwable)e);
        }
        ProjectTopics pt = new ProjectTopics(topicDto.getName(), topicDto.getNumOfPartitions(), topicDto.getNumOfReplicas(), project, schema);
        this.projectTopicsFacade.save(pt);
        return pt;
    }

    private KafkaFuture<CreateTopicsResult> createTopicInKafka(TopicDTO topicDTO) {
        return this.hopsKafkaAdminClient.listTopics().names().thenApply(set -> {
            if (set.contains(topicDTO.getName())) {
                return null;
            }
            NewTopic newTopic = new NewTopic(topicDTO.getName(), topicDTO.getNumOfPartitions().intValue(), topicDTO.getNumOfReplicas().shortValue());
            try {
                return this.hopsKafkaAdminClient.createTopics(Collections.singleton(newTopic));
            }
            catch (Exception e) {
                LOGGER.log(Level.WARNING, e.getMessage(), e);
                return null;
            }
        });
    }

    private KafkaFuture<List<PartitionDetailsDTO>> getTopicDetailsFromKafkaCluster(String topicName) {
        return this.hopsKafkaAdminClient.describeTopics(Collections.singleton(topicName)).all().thenApply(map -> map.getOrDefault(topicName, null)).thenApply(td -> {
            if (td != null) {
                ArrayList<PartitionDetailsDTO> partitionDetails = new ArrayList<PartitionDetailsDTO>();
                List partitions = td.partitions();
                for (TopicPartitionInfo partition : partitions) {
                    int id = partition.partition();
                    List<String> replicas = partition.replicas().stream().map(Node::host).collect(Collectors.toList());
                    List<String> inSyncReplicas = partition.isr().stream().map(Node::host).collect(Collectors.toList());
                    partitionDetails.add(new PartitionDetailsDTO(id, partition.leader().host(), replicas, inSyncReplicas));
                }
                partitionDetails.sort(Comparator.comparing(PartitionDetailsDTO::getId));
                return partitionDetails;
            }
            return Collections.emptyList();
        });
    }

    public KafkaFuture<List<PartitionDetailsDTO>> getTopicDetails(Project project, String topicName) throws KafkaException {
        this.projectTopicsFacade.findTopicByNameAndProject(project, topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName));
        return this.getTopicDetailsFromKafkaCluster(topicName);
    }

    public TopicDefaultValueDTO topicDefaultValues() throws KafkaException {
        try {
            Set<String> brokers = this.kafkaBrokers.getBrokerEndpoints();
            return new TopicDefaultValueDTO(this.settings.getKafkaDefaultNumReplicas(), this.settings.getKafkaDefaultNumPartitions(), brokers.size());
        }
        catch (IOException | InterruptedException | KeeperException ex) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.KAFKA_GENERIC_ERROR, Level.SEVERE, "", ex.getMessage(), ex);
        }
    }

    public void updateTopicSchemaVersion(Project project, String topicName, Integer schemaVersion) throws KafkaException {
        ProjectTopics pt = this.projectTopicsFacade.findTopicByNameAndProject(project, topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName));
        String schemaName = pt.getSubjects().getSubject();
        Subjects st = this.subjectsFacade.findSubjectByNameAndVersion(project, schemaName, schemaVersion).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_VERSION_NOT_FOUND, Level.FINE, "schema: " + schemaName + ", version: " + schemaVersion));
        this.projectTopicsFacade.updateTopicSchemaVersion(pt, st);
    }

    public SubjectDTO getSubjectForTopic(Project project, String topic) throws KafkaException {
        Optional pt = this.projectTopicsFacade.findTopicByNameAndProject(project, topic);
        if (!pt.isPresent()) {
            List<DatasetSharedWith> datasetSharedWithList = this.datasetSharedWithFacade.findByProject(project);
            pt = datasetSharedWithList.stream().map(datasetSharedWith -> this.projectTopicsFacade.findTopicByNameAndProject(datasetSharedWith.getDataset().getProject(), topic)).findFirst().orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + topic + ", project: " + project.getName()));
        }
        if (!pt.isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "project=" + project.getName() + ", topic=" + topic);
        }
        return new SubjectDTO(pt.get().getSubjects());
    }

    public void updateTopicSubjectVersion(Project project, String topic, String subject, Integer version) throws KafkaException, SchemaException {
        ProjectTopics pt = this.projectTopicsFacade.findTopicByNameAndProject(project, topic).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topic));
        String topicSubject = pt.getSubjects().getSubject();
        Subjects st = this.subjectsFacade.findSubjectByNameAndVersion(project, subject, version).orElseThrow(() -> new SchemaException(RESTCodes.SchemaRegistryErrorCode.VERSION_NOT_FOUND, Level.FINE, "schema: " + topicSubject + ", version: " + version));
        this.projectTopicsFacade.updateTopicSchemaVersion(pt, st);
    }
}

