/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.kafka;

import io.hops.hopsworks.common.dao.dataset.DatasetSharedWithFacade;
import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.dao.kafka.PartitionDetailsDTO;
import io.hops.hopsworks.common.dao.kafka.ProjectTopicsFacade;
import io.hops.hopsworks.common.dao.kafka.TopicDTO;
import io.hops.hopsworks.common.dao.kafka.TopicDefaultValueDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectsFacade;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.kafka.FeatureStoreKafkaConnectorDTO;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.persistence.entity.dataset.DatasetSharedWith;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
import io.hops.hopsworks.persistence.entity.kafka.schemas.Subjects;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.restutils.RESTCodes;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;

@Singleton
@ConcurrencyManagement(value=ConcurrencyManagementType.BEAN)
@TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
public class KafkaController {
    private static final Logger LOGGER = Logger.getLogger(KafkaController.class.getName());
    @EJB
    private Settings settings;
    @EJB
    protected ProjectTopicsFacade projectTopicsFacade;
    @EJB
    protected HopsKafkaAdminClient hopsKafkaAdminClient;
    @EJB
    private SubjectsFacade subjectsFacade;
    @EJB
    protected KafkaBrokers kafkaBrokers;
    @EJB
    private DatasetSharedWithFacade datasetSharedWithFacade;
    @EJB
    protected FeaturestoreStorageConnectorController storageConnectorController;

    public synchronized ProjectTopics createTopic(Project project, TopicDTO topicDto) throws KafkaException {
        if (this.externalKafka(project)) {
            return null;
        }
        if (topicDto == null) {
            throw new IllegalArgumentException("topicDto was not provided.");
        }
        String topicName = topicDto.getName();
        if (this.projectTopicsFacade.findTopicByName(topicName).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS, Level.FINE, "topic name: " + topicName);
        }
        if (this.projectTopicsFacade.findTopicsByProject(project).size() > project.getKafkaMaxNumTopics()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_LIMIT_REACHED, Level.FINE, "topic name: " + topicName + ", project: " + project.getName());
        }
        this.checkReplication(topicDto);
        return this.createTopicInProject(project, topicDto);
    }

    protected void checkReplication(TopicDTO topicDto) throws KafkaException {
        List<String> brokerEndpoints = this.kafkaBrokers.getBrokerEndpoints(KafkaBrokers.BrokerProtocol.INTERNAL);
        if (brokerEndpoints.isEmpty()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.BROKER_MISSING, Level.FINE);
        }
        if (brokerEndpoints.size() < topicDto.getNumOfReplicas()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_REPLICATION_ERROR, Level.FINE, "maximum: " + brokerEndpoints.size());
        }
    }

    public void removeTopicFromProject(Project project, String topicName) {
        Optional<ProjectTopics> optionalPt = this.projectTopicsFacade.findTopicByNameAndProject(project, topicName);
        if (optionalPt.isPresent()) {
            ProjectTopics pt = optionalPt.get();
            this.projectTopicsFacade.remove(pt);
            this.deleteTopic(Collections.singletonList(pt));
        }
    }

    public void removeKafkaTopics(Project project) {
        List<ProjectTopics> topics = this.projectTopicsFacade.findTopicsByProject(project);
        if (!topics.isEmpty()) {
            this.deleteTopic(topics);
        }
    }

    private void deleteTopic(List<ProjectTopics> topics) {
        List<String> topicNameList = topics.stream().map(ProjectTopics::getTopicName).collect(Collectors.toList());
        this.hopsKafkaAdminClient.deleteTopics(topicNameList);
    }

    public List<TopicDTO> findTopicsByProject(Project project) {
        List<ProjectTopics> ptList = this.projectTopicsFacade.findTopicsByProject(project);
        ArrayList<TopicDTO> topics = new ArrayList<TopicDTO>();
        for (ProjectTopics pt : ptList) {
            TopicDTO topicDTO = new TopicDTO(pt.getTopicName(), pt.getNumOfReplicas(), pt.getNumOfPartitions());
            Subjects subjects = pt.getSubjects();
            if (subjects != null) {
                topicDTO.setSchemaName(subjects.getSubject());
                topicDTO.setSchemaVersion(subjects.getVersion());
            }
            topicDTO.setShared(false);
            topics.add(topicDTO);
        }
        return topics;
    }

    public List<TopicDTO> findAllTopicsByProject(Project project) {
        return this.findTopicsByProject(project);
    }

    private ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) throws KafkaException {
        Subjects subjects = null;
        if (topicDto.getSchemaName() != null && topicDto.getSchemaVersion() != null) {
            subjects = this.subjectsFacade.findSubjectByNameAndVersion(project, topicDto.getSchemaName(), topicDto.getSchemaVersion()).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_NOT_FOUND, Level.FINE, "topic: " + topicDto.getName()));
        }
        try {
            this.createTopicInKafka(topicDto);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_CREATION_FAILED, Level.WARNING, "Topic name: " + topicDto.getName(), e.getMessage(), (Throwable)e);
        }
        ProjectTopics pt = new ProjectTopics(topicDto.getName(), topicDto.getNumOfPartitions(), topicDto.getNumOfReplicas(), project, subjects);
        this.projectTopicsFacade.save(pt);
        return pt;
    }

    private void createTopicInKafka(TopicDTO topicDTO) throws ExecutionException, InterruptedException, TimeoutException, KafkaException {
        KafkaFuture result = this.hopsKafkaAdminClient.listTopics().names().thenApply(set -> {
            if (set.contains(topicDTO.getName())) {
                return null;
            }
            NewTopic newTopic = new NewTopic(topicDTO.getName(), topicDTO.getNumOfPartitions().intValue(), topicDTO.getNumOfReplicas().shortValue());
            try {
                return this.hopsKafkaAdminClient.createTopics(Collections.singleton(newTopic));
            }
            catch (Exception e) {
                LOGGER.log(Level.WARNING, e.getMessage(), e);
                return null;
            }
        });
        if (result.get(6000L, TimeUnit.MILLISECONDS) == null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS_IN_ZOOKEEPER, Level.INFO, "topic name: " + topicDTO.getName());
        }
    }

    public List<PartitionDetailsDTO> getTopicDetails(Project project, String topicName) throws ExecutionException, InterruptedException, TimeoutException, FeaturestoreException {
        FeatureStoreKafkaConnectorDTO connector = this.storageConnectorController.getKafkaConnector(project);
        KafkaFuture result = this.hopsKafkaAdminClient.describeTopics(connector, Collections.singleton(topicName)).all().thenApply(map -> map.getOrDefault(topicName, null)).thenApply(td -> {
            if (td != null) {
                ArrayList<PartitionDetailsDTO> partitionDetails = new ArrayList<PartitionDetailsDTO>();
                List partitions = td.partitions();
                for (TopicPartitionInfo partition : partitions) {
                    int id = partition.partition();
                    List<String> replicas = partition.replicas().stream().map(Node::host).collect(Collectors.toList());
                    List<String> inSyncReplicas = partition.isr().stream().map(Node::host).collect(Collectors.toList());
                    partitionDetails.add(new PartitionDetailsDTO(id, partition.leader().host(), replicas, inSyncReplicas));
                }
                partitionDetails.sort(Comparator.comparing(PartitionDetailsDTO::getId));
                return partitionDetails;
            }
            return Collections.emptyList();
        });
        return (List)result.get(3000L, TimeUnit.MILLISECONDS);
    }

    public TopicDefaultValueDTO topicDefaultValues() {
        List<String> brokers = this.kafkaBrokers.getBrokerEndpoints(KafkaBrokers.BrokerProtocol.INTERNAL);
        return new TopicDefaultValueDTO(this.settings.getKafkaDefaultNumReplicas(), this.settings.getKafkaDefaultNumPartitions(), brokers.size());
    }

    public SubjectDTO getSubjectForTopic(Project project, String topic) throws KafkaException {
        Optional pt = this.projectTopicsFacade.findTopicByNameAndProject(project, topic);
        if (!pt.isPresent()) {
            List<DatasetSharedWith> datasetSharedWithList = this.datasetSharedWithFacade.findByProject(project);
            pt = datasetSharedWithList.stream().map(datasetSharedWith -> this.projectTopicsFacade.findTopicByNameAndProject(datasetSharedWith.getDataset().getProject(), topic)).findFirst().orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + topic + ", project: " + project.getName()));
        }
        if (!pt.isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "project=" + project.getName() + ", topic=" + topic);
        }
        return new SubjectDTO(pt.get().getSubjects());
    }

    public void updateTopicSubjectVersion(Project project, String topic, String subject, Integer version) throws KafkaException, SchemaException {
        ProjectTopics pt = this.projectTopicsFacade.findTopicByNameAndProject(project, topic).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topic));
        String topicSubject = pt.getSubjects().getSubject();
        Subjects st = this.subjectsFacade.findSubjectByNameAndVersion(project, subject, version).orElseThrow(() -> new SchemaException(RESTCodes.SchemaRegistryErrorCode.VERSION_NOT_FOUND, Level.FINE, "schema: " + topicSubject + ", version: " + version));
        this.projectTopicsFacade.updateTopicSchemaVersion(pt, st);
    }

    protected boolean externalKafka(Project project) {
        FeatureStoreKafkaConnectorDTO connector;
        try {
            connector = this.storageConnectorController.getKafkaConnector(project);
        }
        catch (FeaturestoreException e) {
            throw new RuntimeException(e);
        }
        return connector.isExternalKafka();
    }
}

