package io.hops.hopsworks.common.kafka;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.constants.auth.AllowedRoles;
import io.hops.hopsworks.common.dao.kafka.AclDTO;
import io.hops.hopsworks.common.dao.kafka.AclUser;
import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.dao.kafka.PartitionDetailsDTO;
import io.hops.hopsworks.common.dao.kafka.ProjectTopicsFacade;
import io.hops.hopsworks.common.dao.kafka.SharedProjectDTO;
import io.hops.hopsworks.common.dao.kafka.SharedTopicsDTO;
import io.hops.hopsworks.common.dao.kafka.SharedTopicsFacade;
import io.hops.hopsworks.common.dao.kafka.TopicAclsFacade;
import io.hops.hopsworks.common.dao.kafka.TopicDTO;
import io.hops.hopsworks.common.dao.kafka.TopicDefaultValueDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectsFacade;
import io.hops.hopsworks.common.dao.project.ProjectFacade;
import io.hops.hopsworks.common.dao.user.UserFacade;
import io.hops.hopsworks.common.project.ProjectController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
import io.hops.hopsworks.persistence.entity.kafka.SharedTopics;
import io.hops.hopsworks.persistence.entity.kafka.TopicAcls;
import io.hops.hopsworks.persistence.entity.kafka.schemas.Subjects;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.project.team.ProjectTeam;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.zookeeper.KeeperException;

@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/kafka/KafkaController.class */
public class KafkaController {
    private static final Logger LOGGER = Logger.getLogger(KafkaController.class.getName());

    @EJB
    private Settings settings;

    @EJB
    private ProjectFacade projectFacade;

    @EJB
    private UserFacade userFacade;

    @EJB
    private ProjectTopicsFacade projectTopicsFacade;

    @EJB
    private SharedTopicsFacade sharedTopicsFacade;

    @EJB
    private HopsKafkaAdminClient hopsKafkaAdminClient;

    @EJB
    private SubjectsFacade subjectsFacade;

    @EJB
    private TopicAclsFacade topicAclsFacade;

    @EJB
    private ProjectController projectController;

    @EJB
    private KafkaBrokers kafkaBrokers;

    public void createTopic(Project project, TopicDTO topicDTO) throws KafkaException, ProjectException, UserException {
        if (topicDTO == null) {
            throw new IllegalArgumentException("topicDto was not provided.");
        }
        String name = topicDTO.getName();
        if (this.projectTopicsFacade.findTopicByName(topicDTO.getName()).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS, Level.FINE, "topic name: " + name);
        }
        if (this.projectTopicsFacade.findTopicsByProject(project).size() > project.getKafkaMaxNumTopics().intValue()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_LIMIT_REACHED, Level.FINE, "topic name: " + name + ", project: " + project.getName());
        }
        try {
            Set<String> brokerEndpoints = this.kafkaBrokers.getBrokerEndpoints();
            if (brokerEndpoints.size() < topicDTO.getNumOfReplicas().intValue()) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_REPLICATION_ERROR, Level.FINE, "maximum: " + brokerEndpoints.size());
            }
            createTopicInProject(project, topicDTO);
            addPermissionAclsToTopic(project, topicDTO.getName(), project.getId());
        } catch (IOException | InterruptedException | KeeperException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.KAFKA_GENERIC_ERROR, Level.SEVERE, "project: " + project.getName(), e.getMessage(), e);
        }
    }

    public void removeTopicFromProject(Project project, String str) throws KafkaException {
        ProjectTopics orElseThrow = this.projectTopicsFacade.findTopicByNameAndProject(project, str).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + str);
        });
        this.projectTopicsFacade.remove(orElseThrow);
        this.hopsKafkaAdminClient.deleteTopics(Collections.singleton(orElseThrow.getTopicName()));
    }

    public boolean projectTopicExists(Project project, String str) {
        return this.projectTopicsFacade.findTopicByNameAndProject(project, str).isPresent();
    }

    public List<TopicDTO> findTopicsByProject(Project project) {
        List<ProjectTopics> findTopicsByProject = this.projectTopicsFacade.findTopicsByProject(project);
        ArrayList arrayList = new ArrayList();
        for (ProjectTopics projectTopics : findTopicsByProject) {
            arrayList.add(new TopicDTO(projectTopics.getTopicName(), projectTopics.getSubjects().getSubject(), projectTopics.getSubjects().getVersion(), (Boolean) false));
        }
        return arrayList;
    }

    public ProjectTopics createTopicInProject(Project project, TopicDTO topicDTO) throws KafkaException {
        Subjects orElseThrow = this.subjectsFacade.findSubjectByNameAndVersion(project, topicDTO.getSchemaName(), topicDTO.getSchemaVersion()).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_NOT_FOUND, Level.FINE, "topic: " + topicDTO.getName());
        });
        try {
            if (createTopicInKafka(topicDTO).get(3000L, TimeUnit.MILLISECONDS) == null) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS_IN_ZOOKEEPER, Level.INFO, "topic name: " + topicDTO.getName());
            }
            ProjectTopics projectTopics = new ProjectTopics(topicDTO.getName(), topicDTO.getNumOfPartitions(), topicDTO.getNumOfReplicas(), project, orElseThrow);
            this.projectTopicsFacade.save(projectTopics);
            return projectTopics;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_FETCH_FAILED, Level.WARNING, "Topic name: " + topicDTO.getName(), e.getMessage(), e);
        }
    }

    private KafkaFuture<CreateTopicsResult> createTopicInKafka(TopicDTO topicDTO) {
        return this.hopsKafkaAdminClient.listTopics().names().thenApply(set -> {
            if (set.contains(topicDTO.getName())) {
                return null;
            }
            try {
                return this.hopsKafkaAdminClient.createTopics(Collections.singleton(new NewTopic(topicDTO.getName(), topicDTO.getNumOfPartitions().intValue(), topicDTO.getNumOfReplicas().shortValue())));
            } catch (Exception e) {
                LOGGER.log(Level.WARNING, e.getMessage(), (Throwable) e);
                return null;
            }
        });
    }

    private KafkaFuture<List<PartitionDetailsDTO>> getTopicDetailsFromKafkaCluster(String str) {
        return this.hopsKafkaAdminClient.describeTopics(Collections.singleton(str)).all().thenApply(map -> {
            return (TopicDescription) map.getOrDefault(str, null);
        }).thenApply(topicDescription -> {
            if (topicDescription == null) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList();
            for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
                arrayList.add(new PartitionDetailsDTO(topicPartitionInfo.partition(), topicPartitionInfo.leader().host(), (List) topicPartitionInfo.replicas().stream().map((v0) -> {
                    return v0.host();
                }).collect(Collectors.toList()), (List) topicPartitionInfo.isr().stream().map((v0) -> {
                    return v0.host();
                }).collect(Collectors.toList())));
            }
            arrayList.sort(Comparator.comparing((v0) -> {
                return v0.getId();
            }));
            return arrayList;
        });
    }

    public KafkaFuture<List<PartitionDetailsDTO>> getTopicDetails(Project project, String str) throws KafkaException {
        this.projectTopicsFacade.findTopicByNameAndProject(project, str).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + str);
        });
        return getTopicDetailsFromKafkaCluster(str);
    }

    public SharedTopicsDTO shareTopicWithProject(Project project, String str, Integer num) throws ProjectException, KafkaException, UserException {
        if (project.getId().equals(num)) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.DESTINATION_PROJECT_IS_TOPIC_OWNER, Level.FINE);
        }
        if (!this.projectTopicsFacade.findTopicByNameAndProject(project, str).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.PROJECT_IS_NOT_THE_OWNER_OF_THE_TOPIC, Level.FINE);
        }
        if (!this.projectTopicsFacade.findTopicByNameAndProject(project, str).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + str);
        }
        if (!Optional.ofNullable(this.projectFacade.find(num)).isPresent()) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "Could not find project: " + num);
        }
        this.sharedTopicsFacade.shareTopic(project, str, num);
        Optional<SharedTopics> findSharedTopicByTopicAndProjectIds = this.sharedTopicsFacade.findSharedTopicByTopicAndProjectIds(str, project.getId(), num);
        SharedTopicsDTO sharedTopicsDTO = new SharedTopicsDTO();
        findSharedTopicByTopicAndProjectIds.ifPresent(sharedTopics -> {
            sharedTopicsDTO.setProjectId(sharedTopics.getProjectId());
            sharedTopicsDTO.setSharedTopicsPK(sharedTopics.getSharedTopicsPK());
        });
        return sharedTopicsDTO;
    }

    public void acceptSharedTopic(Project project, String str) throws KafkaException, ProjectException, UserException {
        ProjectTopics orElseThrow = this.projectTopicsFacade.findTopicByName(str).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topicName: " + str);
        });
        if (!this.sharedTopicsFacade.findSharedTopicByProjectAndTopic(project.getId(), str).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + str + ", project: " + project.getName());
        }
        addPermissionAclsToTopic(project, str, orElseThrow.getProject().getId());
        this.sharedTopicsFacade.acceptSharedTopic(orElseThrow.getProject().getId(), str, project.getId());
    }

    public Optional<TopicAcls> getTopicAclsByDto(String str, AclDTO aclDTO) throws UserException {
        return this.topicAclsFacade.getTopicAcls(str, aclDTO, KafkaConst.buildPrincipalName(aclDTO.getProjectName(), ((Users) Optional.ofNullable(this.userFacade.findByEmail(aclDTO.getUserEmail())).orElseThrow(() -> {
            return new UserException(RESTCodes.UserErrorCode.USER_WAS_NOT_FOUND, Level.FINE, "user: " + aclDTO.getUserEmail());
        })).getUsername()));
    }

    public List<AclUser> getTopicAclUsers(Project project, String str) {
        if (project == null || Strings.isNullOrEmpty(str)) {
            throw new IllegalArgumentException("ProjectId must be non-null, topic must be provided");
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator it = project.getProjectTeamCollection().iterator();
        while (it.hasNext()) {
            arrayList2.add(((ProjectTeam) it.next()).getUser().getEmail());
        }
        arrayList2.add(Settings.KAFKA_ACL_WILDCARD);
        HashMap hashMap = new HashMap();
        hashMap.put(project.getName(), arrayList2);
        for (SharedTopics sharedTopics : this.sharedTopicsFacade.findSharedTopicsByTopicName(str)) {
            ArrayList arrayList3 = new ArrayList();
            Project find = this.projectFacade.find(Integer.valueOf(sharedTopics.getSharedTopicsPK().getProjectId()));
            Iterator it2 = find.getProjectTeamCollection().iterator();
            while (it2.hasNext()) {
                arrayList3.add(((ProjectTeam) it2.next()).getUser().getEmail());
            }
            arrayList3.add(Settings.KAFKA_ACL_WILDCARD);
            hashMap.put(find.getName(), arrayList3);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            arrayList.add(new AclUser((String) entry.getKey(), new HashSet((Collection) entry.getValue())));
        }
        return arrayList;
    }

    public TopicDefaultValueDTO topicDefaultValues() throws KafkaException {
        try {
            return new TopicDefaultValueDTO(this.settings.getKafkaDefaultNumReplicas(), this.settings.getKafkaDefaultNumPartitions(), Integer.valueOf(this.kafkaBrokers.getBrokerEndpoints().size()));
        } catch (IOException | InterruptedException | KeeperException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.KAFKA_GENERIC_ERROR, Level.SEVERE, KafkaConst.KAFKA_ENDPOINT_IDENTIFICATION_ALGORITHM, e.getMessage(), e);
        }
    }

    public List<SharedProjectDTO> getTopicSharedProjects(String str, Integer num) {
        List<SharedTopics> findSharedTopicsByTopicAndOwnerProject = this.sharedTopicsFacade.findSharedTopicsByTopicAndOwnerProject(str, num);
        ArrayList arrayList = new ArrayList();
        Iterator<SharedTopics> it = findSharedTopicsByTopicAndOwnerProject.iterator();
        while (it.hasNext()) {
            Project find = this.projectFacade.find(Integer.valueOf(it.next().getSharedTopicsPK().getProjectId()));
            if (find != null) {
                arrayList.add(new SharedProjectDTO(find.getName(), find.getId()));
            }
        }
        return arrayList;
    }

    public void unshareTopic(Project project, String str, String str2) throws ProjectException, KafkaException {
        ArrayList<SharedTopics> arrayList = new ArrayList();
        Project findByName = this.projectFacade.findByName(str2);
        if (this.projectTopicsFacade.findTopicByName(str).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "Topic:" + str);
        }).getProject().equals(project)) {
            if (findByName == null) {
                arrayList.addAll(this.sharedTopicsFacade.findSharedTopicsByTopicName(str));
            } else {
                arrayList.add(this.sharedTopicsFacade.findSharedTopicByProjectAndTopic(findByName.getId(), str).orElseThrow(() -> {
                    return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + str + ", project: " + findByName.getName());
                }));
            }
        } else if (findByName == null) {
            arrayList.add(this.sharedTopicsFacade.findSharedTopicByProjectAndTopic(project.getId(), str).orElseThrow(() -> {
                return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + str + ", project: " + project.getId());
            }));
        }
        for (SharedTopics sharedTopics : arrayList) {
            this.sharedTopicsFacade.remove(sharedTopics);
            this.topicAclsFacade.removeAclFromTopic(str, this.projectFacade.findById(Integer.valueOf(sharedTopics.getSharedTopicsPK().getProjectId())).orElseThrow(() -> {
                return new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "project: " + sharedTopics.getSharedTopicsPK().getProjectId());
            }));
        }
    }

    public Pair<TopicAcls, Response.Status> addAclsToTopic(String str, Integer num, AclDTO aclDTO) throws ProjectException, KafkaException, UserException {
        return addAclsToTopic(str, num, aclDTO.getProjectName(), aclDTO.getUserEmail(), aclDTO.getPermissionType(), aclDTO.getOperationType(), aclDTO.getHost(), aclDTO.getRole());
    }

    private Pair<TopicAcls, Response.Status> addAclsToTopic(String str, Integer num, String str2, String str3, String str4, String str5, String str6, String str7) throws ProjectException, KafkaException, UserException {
        if (Strings.isNullOrEmpty(str) || str3 == null) {
            throw new IllegalArgumentException("Topic and userEmail must be provided.");
        }
        Project project = (Project) Optional.ofNullable(this.projectFacade.find(num)).orElseThrow(() -> {
            return new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "projectId: " + num);
        });
        if (!project.getName().equals(str2) && this.projectFacade.findByName(str2) == null) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "The specified project for the topic" + str + " was not found");
        }
        ProjectTopics orElseThrow = this.projectTopicsFacade.findTopicByNameAndProject(project, str).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "Topic: " + str);
        });
        if (str3.equals(Settings.KAFKA_ACL_WILDCARD)) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_FOR_ANY_USER, Level.FINE, "topic: " + str);
        }
        Users users = (Users) Optional.ofNullable(this.userFacade.findByEmail(str3)).orElseThrow(() -> {
            return new UserException(RESTCodes.UserErrorCode.USER_WAS_NOT_FOUND, Level.FINE, "user: " + str3);
        });
        String buildPrincipalName = KafkaConst.buildPrincipalName(str2, users.getUsername());
        Optional<TopicAcls> topicAcls = this.topicAclsFacade.getTopicAcls(str, buildPrincipalName, str4, str5, str6, str7);
        return topicAcls.isPresent() ? Pair.of(topicAcls.get(), Response.Status.OK) : Pair.of(this.topicAclsFacade.addAclsToTopic(orElseThrow, users, str4, str5, str6, str7, buildPrincipalName), Response.Status.CREATED);
    }

    private void addMemberPermissionAclToTopic(ProjectTeam projectTeam, String str, Integer num) throws ProjectException, KafkaException, UserException {
        if (AllowedRoles.DATA_OWNER.equalsIgnoreCase(projectTeam.getTeamRole())) {
            addAclsToTopic(str, num, new AclDTO(projectTeam.getProject().getName(), projectTeam.getUser().getEmail(), "allow", Settings.KAFKA_ACL_WILDCARD, Settings.KAFKA_ACL_WILDCARD, Settings.KAFKA_ACL_WILDCARD));
        } else {
            addAclsToTopic(str, num, new AclDTO(projectTeam.getProject().getName(), projectTeam.getUser().getEmail(), "allow", Settings.KAFKA_ACL_OPERATION_TYPE_READ, Settings.KAFKA_ACL_WILDCARD, Settings.KAFKA_ACL_WILDCARD));
            addAclsToTopic(str, num, new AclDTO(projectTeam.getProject().getName(), projectTeam.getUser().getEmail(), "allow", Settings.KAFKA_ACL_OPERATION_TYPE_DETAILS, Settings.KAFKA_ACL_WILDCARD, Settings.KAFKA_ACL_WILDCARD));
        }
    }

    public void addPermissionAclsToTopic(Project project, String str, Integer num) throws ProjectException, KafkaException, UserException {
        Iterator it = project.getProjectTeamCollection().iterator();
        while (it.hasNext()) {
            addMemberPermissionAclToTopic((ProjectTeam) it.next(), str, num);
        }
    }

    public void addProjectMemberToTopics(ProjectTeam projectTeam) throws KafkaException, ProjectException, UserException {
        List<TopicDTO> findTopicsByProject = findTopicsByProject(projectTeam.getProject());
        for (SharedTopics sharedTopics : this.sharedTopicsFacade.findSharedTopicsByProject(projectTeam.getProject().getId())) {
            addMemberPermissionAclToTopic(projectTeam, sharedTopics.getSharedTopicsPK().getTopicName(), sharedTopics.getProjectId());
        }
        Iterator<TopicDTO> it = findTopicsByProject.iterator();
        while (it.hasNext()) {
            addMemberPermissionAclToTopic(projectTeam, it.next().getName(), projectTeam.getProject().getId());
        }
    }

    public void removeProjectMemberFromTopics(ProjectTeam projectTeam) {
        this.sharedTopicsFacade.findSharedTopicsByProject(projectTeam.getProject().getId()).stream().map(sharedTopics -> {
            try {
                return this.projectController.findProjectById(Integer.valueOf(sharedTopics.getSharedTopicsPK().getProjectId()));
            } catch (ProjectException e) {
                throw new RuntimeException((Throwable) e);
            }
        }).map((v0) -> {
            return v0.getName();
        }).forEach(str -> {
            this.topicAclsFacade.removeAclsForUserAndPrincipalProject(projectTeam.getUser(), str);
        });
        this.topicAclsFacade.removeAclsForUser(projectTeam.getUser(), projectTeam.getProject());
    }

    public void updateProjectMemberTopics(ProjectTeam projectTeam) throws ProjectException, KafkaException, UserException {
        removeProjectMemberFromTopics(projectTeam);
        addProjectMemberToTopics(projectTeam);
    }

    public List<TopicDTO> findSharedTopicsByProject(Integer num) {
        List<SharedTopics> findSharedTopicsByProject = this.sharedTopicsFacade.findSharedTopicsByProject(num);
        ArrayList arrayList = new ArrayList();
        for (SharedTopics sharedTopics : findSharedTopicsByProject) {
            Optional<U> map = this.projectTopicsFacade.findTopicByName(sharedTopics.getSharedTopicsPK().getTopicName()).map(projectTopics -> {
                return new TopicDTO(sharedTopics.getSharedTopicsPK().getTopicName(), sharedTopics.getProjectId(), projectTopics.getSubjects().getSubject(), projectTopics.getSubjects().getVersion(), projectTopics.getSubjects().getSchema().getSchema(), (Boolean) true, sharedTopics.getAccepted());
            });
            arrayList.getClass();
            map.ifPresent((v1) -> {
                r1.add(v1);
            });
        }
        return arrayList;
    }

    public void updateTopicSchemaVersion(Project project, String str, Integer num) throws KafkaException {
        ProjectTopics orElseThrow = this.projectTopicsFacade.findTopicByNameAndProject(project, str).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + str);
        });
        String subject = orElseThrow.getSubjects().getSubject();
        this.projectTopicsFacade.updateTopicSchemaVersion(orElseThrow, this.subjectsFacade.findSubjectByNameAndVersion(project, subject, num).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_VERSION_NOT_FOUND, Level.FINE, "schema: " + subject + ", version: " + num);
        }));
    }

    public void removeAclsForUser(Users users, Integer num) throws ProjectException {
        Project find = this.projectFacade.find(num);
        if (find == null) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "projectId:" + num);
        }
        this.topicAclsFacade.removeAclsForUser(users, find);
    }

    public void removeAclForProject(Integer num) throws ProjectException {
        Project find = this.projectFacade.find(num);
        if (find == null) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "projectId:" + num);
        }
        this.topicAclsFacade.removeAclForProject(find);
    }

    public Integer updateTopicAcl(Project project, String str, Integer num, AclDTO aclDTO) throws KafkaException, ProjectException, UserException {
        if (!this.projectTopicsFacade.findTopicByNameAndProject(project, str).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, str);
        }
        TopicAcls find = this.topicAclsFacade.find(num);
        if (find == null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOUND, Level.FINE, "topic: " + str);
        }
        this.topicAclsFacade.remove(find);
        return ((TopicAcls) addAclsToTopic(str, project.getId(), aclDTO).getLeft()).getId();
    }

    public Optional<TopicAcls> findAclByIdAndTopic(String str, Integer num) throws KafkaException {
        Optional<TopicAcls> ofNullable = Optional.ofNullable(this.topicAclsFacade.find(num));
        if (!ofNullable.isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOUND, Level.FINE, "aclId: " + num);
        }
        if (str.equals(ofNullable.get().getProjectTopics().getTopicName())) {
            return ofNullable;
        }
        throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOR_TOPIC, Level.FINE, "topic: " + str);
    }

    public void removeAclFromTopic(String str, Integer num) throws KafkaException {
        TopicAcls find = this.topicAclsFacade.find(num);
        if (find == null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOUND, Level.FINE, "topic: " + str);
        }
        if (!find.getProjectTopics().getTopicName().equals(str)) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOR_TOPIC, Level.FINE, "topic: " + str);
        }
        this.topicAclsFacade.remove(find);
    }

    public SubjectDTO getSubjectForTopic(Project project, String str) throws KafkaException, ProjectException {
        Optional<ProjectTopics> findTopicByNameAndProject = this.projectTopicsFacade.findTopicByNameAndProject(project, str);
        if (!findTopicByNameAndProject.isPresent()) {
            SharedTopics orElseThrow = this.sharedTopicsFacade.findSharedTopicByProjectAndTopic(project.getId(), str).orElseThrow(() -> {
                return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + str + ", project: " + project.getName());
            });
            findTopicByNameAndProject = this.projectTopicsFacade.findTopicByNameAndProject(this.projectFacade.findById(orElseThrow.getProjectId()).orElseThrow(() -> {
                return new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "projectId: " + orElseThrow.getSharedTopicsPK().getProjectId());
            }), str);
        }
        if (findTopicByNameAndProject.isPresent()) {
            return new SubjectDTO(findTopicByNameAndProject.get().getSubjects());
        }
        throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "project=" + project.getName() + ", topic=" + str);
    }

    public void updateTopicSubjectVersion(Project project, String str, String str2, Integer num) throws KafkaException, SchemaException {
        ProjectTopics orElseThrow = this.projectTopicsFacade.findTopicByNameAndProject(project, str).orElseThrow(() -> {
            return new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + str);
        });
        String subject = orElseThrow.getSubjects().getSubject();
        this.projectTopicsFacade.updateTopicSchemaVersion(orElseThrow, this.subjectsFacade.findSubjectByNameAndVersion(project, str2, num).orElseThrow(() -> {
            return new SchemaException(RESTCodes.SchemaRegistryErrorCode.VERSION_NOT_FOUND, Level.FINE, "schema: " + subject + ", version: " + num);
        }));
    }
}
