package io.hops.hopsworks.common.kafka;

import io.hops.hopsworks.common.dao.dataset.DatasetSharedWithFacade;
import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.dao.kafka.PartitionDetailsDTO;
import io.hops.hopsworks.common.dao.kafka.ProjectTopicsFacade;
import io.hops.hopsworks.common.dao.kafka.TopicDTO;
import io.hops.hopsworks.common.dao.kafka.TopicDefaultValueDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectsFacade;
import io.hops.hopsworks.common.dao.project.ProjectFacade;
import io.hops.hopsworks.common.dao.user.UserFacade;
import io.hops.hopsworks.common.project.ProjectController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
import io.hops.hopsworks.persistence.entity.kafka.schemas.Subjects;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.zookeeper.KeeperException;

@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/kafka/KafkaController.class */
public class KafkaController {
    private static final Logger LOGGER = Logger.getLogger(KafkaController.class.getName());

    @EJB
    private Settings settings;

    @EJB
    private ProjectFacade projectFacade;

    @EJB
    private UserFacade userFacade;

    @EJB
    private ProjectTopicsFacade projectTopicsFacade;

    @EJB
    private HopsKafkaAdminClient hopsKafkaAdminClient;

    @EJB
    private SubjectsFacade subjectsFacade;

    @EJB
    private ProjectController projectController;

    @EJB
    private KafkaBrokers kafkaBrokers;

    @EJB
    private DatasetSharedWithFacade datasetSharedWithFacade;

    public void createTopic(Project project, TopicDTO topicDTO) throws KafkaException {
        if (topicDTO == null) {
            throw new IllegalArgumentException("topicDto was not provided.");
        }
        String name = topicDTO.getName();
        if (this.projectTopicsFacade.findTopicByName(topicDTO.getName()).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS, Level.FINE, "topic name: " + name);
        }
        if (this.projectTopicsFacade.findTopicsByProject(project).size() > project.getKafkaMaxNumTopics().intValue()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_LIMIT_REACHED, Level.FINE, "topic name: " + name + ", project: " + project.getName());
        }
        try {
            Set<String> brokerEndpoints = this.kafkaBrokers.getBrokerEndpoints();
            if (brokerEndpoints.size() < topicDTO.getNumOfReplicas().intValue()) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_REPLICATION_ERROR, Level.FINE, "maximum: " + brokerEndpoints.size());
            }
            createTopicInProject(project, topicDTO);
        } catch (IOException | InterruptedException | KeeperException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.KAFKA_GENERIC_ERROR, Level.SEVERE, "project: " + project.getName(), e.getMessage(), e);
        }
    }

    public void removeTopicFromProject(Project project, String str) throws KafkaException {
        ProjectTopics orElseThrow = this.projectTopicsFacade.findTopicByNameAndProject(project, str).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + str);
        });
        this.projectTopicsFacade.remove(orElseThrow);
        this.hopsKafkaAdminClient.deleteTopics(Collections.singleton(orElseThrow.getTopicName()));
    }

    public boolean projectTopicExists(Project project, String str) {
        return this.projectTopicsFacade.findTopicByNameAndProject(project, str).isPresent();
    }

    public List<TopicDTO> findTopicsByProject(Project project) {
        List<ProjectTopics> findTopicsByProject = this.projectTopicsFacade.findTopicsByProject(project);
        ArrayList arrayList = new ArrayList();
        for (ProjectTopics projectTopics : findTopicsByProject) {
            arrayList.add(new TopicDTO(projectTopics.getTopicName(), projectTopics.getSubjects().getSubject(), projectTopics.getSubjects().getVersion(), (Boolean) false));
        }
        return arrayList;
    }

    public List<TopicDTO> findAllTopicsByProject(Project project) {
        return findTopicsByProject(project);
    }

    public ProjectTopics createTopicInProject(Project project, TopicDTO topicDTO) throws KafkaException {
        Subjects orElseThrow = this.subjectsFacade.findSubjectByNameAndVersion(project, topicDTO.getSchemaName(), topicDTO.getSchemaVersion()).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_NOT_FOUND, Level.FINE, "topic: " + topicDTO.getName());
        });
        try {
            if (createTopicInKafka(topicDTO).get(3000L, TimeUnit.MILLISECONDS) == null) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS_IN_ZOOKEEPER, Level.INFO, "topic name: " + topicDTO.getName());
            }
            ProjectTopics projectTopics = new ProjectTopics(topicDTO.getName(), topicDTO.getNumOfPartitions(), topicDTO.getNumOfReplicas(), project, orElseThrow);
            this.projectTopicsFacade.save(projectTopics);
            return projectTopics;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_CREATION_FAILED, Level.WARNING, "Topic name: " + topicDTO.getName(), e.getMessage(), e);
        }
    }

    private KafkaFuture<CreateTopicsResult> createTopicInKafka(TopicDTO topicDTO) {
        return this.hopsKafkaAdminClient.listTopics().names().thenApply(set -> {
            if (set.contains(topicDTO.getName())) {
                return null;
            }
            try {
                return this.hopsKafkaAdminClient.createTopics(Collections.singleton(new NewTopic(topicDTO.getName(), topicDTO.getNumOfPartitions().intValue(), topicDTO.getNumOfReplicas().shortValue())));
            } catch (Exception e) {
                LOGGER.log(Level.WARNING, e.getMessage(), (Throwable) e);
                return null;
            }
        });
    }

    private KafkaFuture<List<PartitionDetailsDTO>> getTopicDetailsFromKafkaCluster(String str) {
        return this.hopsKafkaAdminClient.describeTopics(Collections.singleton(str)).all().thenApply(map -> {
            return (TopicDescription) map.getOrDefault(str, null);
        }).thenApply(topicDescription -> {
            if (topicDescription == null) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList();
            for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
                arrayList.add(new PartitionDetailsDTO(topicPartitionInfo.partition(), topicPartitionInfo.leader().host(), (List) topicPartitionInfo.replicas().stream().map((v0) -> {
                    return v0.host();
                }).collect(Collectors.toList()), (List) topicPartitionInfo.isr().stream().map((v0) -> {
                    return v0.host();
                }).collect(Collectors.toList())));
            }
            arrayList.sort(Comparator.comparing((v0) -> {
                return v0.getId();
            }));
            return arrayList;
        });
    }

    public KafkaFuture<List<PartitionDetailsDTO>> getTopicDetails(Project project, String str) throws KafkaException {
        this.projectTopicsFacade.findTopicByNameAndProject(project, str).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + str);
        });
        return getTopicDetailsFromKafkaCluster(str);
    }

    public TopicDefaultValueDTO topicDefaultValues() throws KafkaException {
        try {
            return new TopicDefaultValueDTO(this.settings.getKafkaDefaultNumReplicas(), this.settings.getKafkaDefaultNumPartitions(), Integer.valueOf(this.kafkaBrokers.getBrokerEndpoints().size()));
        } catch (IOException | InterruptedException | KeeperException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.KAFKA_GENERIC_ERROR, Level.SEVERE, KafkaConst.KAFKA_ENDPOINT_IDENTIFICATION_ALGORITHM, e.getMessage(), e);
        }
    }

    public void updateTopicSchemaVersion(Project project, String str, Integer num) throws KafkaException {
        ProjectTopics orElseThrow = this.projectTopicsFacade.findTopicByNameAndProject(project, str).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + str);
        });
        String subject = orElseThrow.getSubjects().getSubject();
        this.projectTopicsFacade.updateTopicSchemaVersion(orElseThrow, this.subjectsFacade.findSubjectByNameAndVersion(project, subject, num).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_VERSION_NOT_FOUND, Level.FINE, "schema: " + subject + ", version: " + num);
        }));
    }

    public SubjectDTO getSubjectForTopic(Project project, String str) throws KafkaException {
        Optional<ProjectTopics> findTopicByNameAndProject = this.projectTopicsFacade.findTopicByNameAndProject(project, str);
        if (!findTopicByNameAndProject.isPresent()) {
            findTopicByNameAndProject = (Optional) this.datasetSharedWithFacade.findByProject(project).stream().map(datasetSharedWith -> {
                return this.projectTopicsFacade.findTopicByNameAndProject(datasetSharedWith.getDataset().getProject(), str);
            }).findFirst().orElseThrow(() -> {
                return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + str + ", project: " + project.getName());
            });
        }
        if (findTopicByNameAndProject.isPresent()) {
            return new SubjectDTO(findTopicByNameAndProject.get().getSubjects());
        }
        throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "project=" + project.getName() + ", topic=" + str);
    }

    public void updateTopicSubjectVersion(Project project, String str, String str2, Integer num) throws KafkaException, SchemaException {
        ProjectTopics orElseThrow = this.projectTopicsFacade.findTopicByNameAndProject(project, str).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + str);
        });
        String subject = orElseThrow.getSubjects().getSubject();
        this.projectTopicsFacade.updateTopicSchemaVersion(orElseThrow, this.subjectsFacade.findSubjectByNameAndVersion(project, str2, num).orElseThrow(() -> {
            return new SchemaException(RESTCodes.SchemaRegistryErrorCode.VERSION_NOT_FOUND, Level.FINE, "schema: " + subject + ", version: " + num);
        }));
    }
}
