/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.kafka;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.dao.certificates.CertsFacade;
import io.hops.hopsworks.common.dao.kafka.AclDTO;
import io.hops.hopsworks.common.dao.kafka.AclUser;
import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.dao.kafka.PartitionDetailsDTO;
import io.hops.hopsworks.common.dao.kafka.ProjectTopicsFacade;
import io.hops.hopsworks.common.dao.kafka.SharedProjectDTO;
import io.hops.hopsworks.common.dao.kafka.SharedTopicsDTO;
import io.hops.hopsworks.common.dao.kafka.SharedTopicsFacade;
import io.hops.hopsworks.common.dao.kafka.TopicAclsFacade;
import io.hops.hopsworks.common.dao.kafka.TopicDTO;
import io.hops.hopsworks.common.dao.kafka.TopicDefaultValueDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectsFacade;
import io.hops.hopsworks.common.dao.project.ProjectFacade;
import io.hops.hopsworks.common.dao.user.UserFacade;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.common.project.ProjectController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.certificates.UserCerts;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
import io.hops.hopsworks.persistence.entity.kafka.SharedTopics;
import io.hops.hopsworks.persistence.entity.kafka.TopicAcls;
import io.hops.hopsworks.persistence.entity.kafka.schemas.Subjects;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.project.team.ProjectTeam;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.zookeeper.KeeperException;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class KafkaController {
    private static final Logger LOGGER = Logger.getLogger(KafkaController.class.getName());
    @EJB
    private CertsFacade userCerts;
    @EJB
    private Settings settings;
    @EJB
    private ProjectFacade projectFacade;
    @EJB
    private UserFacade userFacade;
    @EJB
    private ProjectTopicsFacade projectTopicsFacade;
    @EJB
    private SharedTopicsFacade sharedTopicsFacade;
    @EJB
    private HopsKafkaAdminClient hopsKafkaAdminClient;
    @EJB
    private SubjectsFacade subjectsFacade;
    @EJB
    private TopicAclsFacade topicAclsFacade;
    @EJB
    private ProjectController projectController;
    @EJB
    private KafkaBrokers kafkaBrokers;

    public void createTopic(Project project, TopicDTO topicDto) throws KafkaException, ProjectException, UserException {
        if (topicDto == null) {
            throw new IllegalArgumentException("topicDto was not provided.");
        }
        String topicName = topicDto.getName();
        if (this.projectTopicsFacade.findTopicByName(topicDto.getName()).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS, Level.FINE, "topic name: " + topicName);
        }
        if (this.projectTopicsFacade.findTopicsByProject(project).size() > project.getKafkaMaxNumTopics()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_LIMIT_REACHED, Level.FINE, "topic name: " + topicName + ", project: " + project.getName());
        }
        try {
            Set<String> brokerEndpoints = this.kafkaBrokers.getBrokerEndpoints();
            if (brokerEndpoints.size() < topicDto.getNumOfReplicas()) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_REPLICATION_ERROR, Level.FINE, "maximum: " + brokerEndpoints.size());
            }
        }
        catch (IOException | InterruptedException | KeeperException ex) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.KAFKA_GENERIC_ERROR, Level.SEVERE, "project: " + project.getName(), ex.getMessage(), ex);
        }
        this.createTopicInProject(project, topicDto);
        this.addFullPermissionAclsToTopic(project.getName(), topicDto.getName(), project.getId());
    }

    public void removeTopicFromProject(Project project, String topicName) throws KafkaException {
        ProjectTopics pt = this.projectTopicsFacade.findTopicByNameAndProject(project, topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName));
        this.projectTopicsFacade.remove(pt);
        this.hopsKafkaAdminClient.deleteTopics(Collections.singleton(pt.getTopicName()));
    }

    public boolean projectTopicExists(Project project, String topicName) {
        return this.projectTopicsFacade.findTopicByNameAndProject(project, topicName).isPresent();
    }

    public List<TopicDTO> findTopicsByProject(Project project) {
        List<ProjectTopics> ptList = this.projectTopicsFacade.findTopicsByProject(project);
        ArrayList<TopicDTO> topics = new ArrayList<TopicDTO>();
        for (ProjectTopics pt : ptList) {
            topics.add(new TopicDTO(pt.getTopicName(), pt.getSubjects().getSubject(), pt.getSubjects().getVersion(), false));
        }
        return topics;
    }

    public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) throws KafkaException {
        Subjects schema = this.subjectsFacade.findSubjectByNameAndVersion(project, topicDto.getSchemaName(), topicDto.getSchemaVersion()).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_NOT_FOUND, Level.FINE, "topic: " + topicDto.getName()));
        try {
            if (this.createTopicInKafka(topicDto).get(3000L, TimeUnit.MILLISECONDS) == null) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS_IN_ZOOKEEPER, Level.INFO, "topic name: " + topicDto.getName());
            }
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_FETCH_FAILED, Level.WARNING, "Topic name: " + topicDto.getName(), e.getMessage(), (Throwable)e);
        }
        ProjectTopics pt = new ProjectTopics(topicDto.getName(), project, schema);
        this.projectTopicsFacade.save(pt);
        return pt;
    }

    private KafkaFuture<CreateTopicsResult> createTopicInKafka(TopicDTO topicDTO) {
        return this.hopsKafkaAdminClient.listTopics().names().thenApply(set -> {
            if (set.contains(topicDTO.getName())) {
                return null;
            }
            NewTopic newTopic = new NewTopic(topicDTO.getName(), topicDTO.getNumOfPartitions().intValue(), topicDTO.getNumOfReplicas().shortValue());
            try {
                return this.hopsKafkaAdminClient.createTopics(Collections.singleton(newTopic));
            }
            catch (Exception e) {
                LOGGER.log(Level.WARNING, e.getMessage(), e);
                return null;
            }
        });
    }

    private KafkaFuture<List<PartitionDetailsDTO>> getTopicDetailsFromKafkaCluster(String topicName) {
        return this.hopsKafkaAdminClient.describeTopics(Collections.singleton(topicName)).all().thenApply(map -> map.getOrDefault(topicName, null)).thenApply(td -> {
            if (td != null) {
                ArrayList<PartitionDetailsDTO> partitionDetails = new ArrayList<PartitionDetailsDTO>();
                List partitions = td.partitions();
                for (TopicPartitionInfo partition : partitions) {
                    int id = partition.partition();
                    List<String> replicas = partition.replicas().stream().map(Node::host).collect(Collectors.toList());
                    List<String> inSyncReplicas = partition.isr().stream().map(Node::host).collect(Collectors.toList());
                    partitionDetails.add(new PartitionDetailsDTO(id, partition.leader().host(), replicas, inSyncReplicas));
                }
                partitionDetails.sort(Comparator.comparing(PartitionDetailsDTO::getId));
                return partitionDetails;
            }
            return Collections.emptyList();
        });
    }

    public KafkaFuture<List<PartitionDetailsDTO>> getTopicDetails(Project project, String topicName) throws KafkaException {
        this.projectTopicsFacade.findTopicByNameAndProject(project, topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName));
        return this.getTopicDetailsFromKafkaCluster(topicName);
    }

    public SharedTopicsDTO shareTopicWithProject(Project project, String topicName, Integer destProjectId) throws ProjectException, KafkaException, UserException {
        if (project.getId().equals(destProjectId)) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.DESTINATION_PROJECT_IS_TOPIC_OWNER, Level.FINE);
        }
        if (!this.projectTopicsFacade.findTopicByNameAndProject(project, topicName).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.PROJECT_IS_NOT_THE_OWNER_OF_THE_TOPIC, Level.FINE);
        }
        if (!this.projectTopicsFacade.findTopicByNameAndProject(project, topicName).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName);
        }
        if (!Optional.ofNullable(this.projectFacade.find(destProjectId)).isPresent()) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "Could not find project: " + destProjectId);
        }
        this.sharedTopicsFacade.shareTopic(project, topicName, destProjectId);
        Optional<SharedTopics> optionalSt = this.sharedTopicsFacade.findSharedTopicByTopicAndProjectIds(topicName, project.getId(), destProjectId);
        SharedTopicsDTO dto = new SharedTopicsDTO();
        optionalSt.ifPresent(st -> {
            dto.setProjectId(st.getProjectId());
            dto.setSharedTopicsPK(st.getSharedTopicsPK());
        });
        return dto;
    }

    public void acceptSharedTopic(Project project, String topicName) throws KafkaException, ProjectException, UserException {
        ProjectTopics pt = this.projectTopicsFacade.findTopicByName(topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topicName: " + topicName));
        if (!this.sharedTopicsFacade.findSharedTopicByProjectAndTopic(project.getId(), topicName).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + topicName + ", project: " + project.getName());
        }
        this.addFullPermissionAclsToTopic(project.getName(), topicName, pt.getProject().getId());
        this.sharedTopicsFacade.acceptSharedTopic(pt.getProject().getId(), topicName, project.getId());
    }

    public Optional<TopicAcls> getTopicAclsByDto(String topicName, AclDTO dto) throws UserException {
        Users user = Optional.ofNullable(this.userFacade.findByEmail(dto.getUserEmail())).orElseThrow(() -> new UserException(RESTCodes.UserErrorCode.USER_WAS_NOT_FOUND, Level.FINE, "user: " + dto.getUserEmail()));
        String principalName = KafkaConst.buildPrincipalName(dto.getProjectName(), user.getUsername());
        return this.topicAclsFacade.getTopicAcls(topicName, dto, principalName);
    }

    private void addFullPermissionAclsToTopic(String aclProjectName, String topicName, Integer projectId) throws ProjectException, KafkaException, UserException {
        Project p = this.projectFacade.findByName(aclProjectName);
        if (p == null) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "Could not find project: " + aclProjectName);
        }
        List acls = p.getProjectTeamCollection().stream().map(member -> member.getUser().getEmail()).map(email -> new AclDTO(p.getName(), (String)email, "allow", "*", "*", "*")).collect(Collectors.toList());
        for (AclDTO acl : acls) {
            this.addAclsToTopic(topicName, projectId, acl);
        }
    }

    public List<AclUser> getTopicAclUsers(Project project, String topicName) {
        if (project == null || Strings.isNullOrEmpty((String)topicName)) {
            throw new IllegalArgumentException("ProjectId must be non-null, topic must be provided");
        }
        ArrayList<AclUser> aclUsers = new ArrayList<AclUser>();
        ArrayList<String> teamMembers = new ArrayList<String>();
        for (ProjectTeam pt : project.getProjectTeamCollection()) {
            teamMembers.add(pt.getUser().getEmail());
        }
        teamMembers.add("*");
        HashMap projectMemberCollections = new HashMap();
        projectMemberCollections.put(project.getName(), teamMembers);
        List<SharedTopics> sharedTopicsList = this.sharedTopicsFacade.findSharedTopicsByTopicName(topicName);
        for (SharedTopics sharedTopics : sharedTopicsList) {
            ArrayList<String> sharedMembers = new ArrayList<String>();
            Project p = this.projectFacade.find(sharedTopics.getSharedTopicsPK().getProjectId());
            for (ProjectTeam pt : p.getProjectTeamCollection()) {
                sharedMembers.add(pt.getUser().getEmail());
            }
            sharedMembers.add("*");
            projectMemberCollections.put(p.getName(), sharedMembers);
        }
        for (Map.Entry entry : projectMemberCollections.entrySet()) {
            aclUsers.add(new AclUser((String)entry.getKey(), new HashSet<String>((Collection)entry.getValue())));
        }
        return aclUsers;
    }

    public Pair<TopicAcls, Response.Status> addAclsToTopic(String topicName, Integer projectId, AclDTO dto) throws ProjectException, KafkaException, UserException {
        return this.addAclsToTopic(topicName, projectId, dto.getProjectName(), dto.getUserEmail(), dto.getPermissionType(), dto.getOperationType(), dto.getHost(), dto.getRole());
    }

    private Pair<TopicAcls, Response.Status> addAclsToTopic(String topicName, Integer projectId, String selectedProjectName, String userEmail, String permissionType, String operationType, String host, String role) throws ProjectException, KafkaException, UserException {
        if (Strings.isNullOrEmpty((String)topicName) || userEmail == null) {
            throw new IllegalArgumentException("Topic and userEmail must be provided.");
        }
        Project topicOwnerProject = Optional.ofNullable(this.projectFacade.find(projectId)).orElseThrow(() -> new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "projectId: " + projectId));
        if (!topicOwnerProject.getName().equals(selectedProjectName) && this.projectFacade.findByName(selectedProjectName) == null) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "The specified project for the topic" + topicName + " was not found");
        }
        ProjectTopics pt = this.projectTopicsFacade.findTopicByNameAndProject(topicOwnerProject, topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "Topic: " + topicName));
        if (userEmail.equals("*")) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_FOR_ANY_USER, Level.FINE, "topic: " + topicName);
        }
        Users user = Optional.ofNullable(this.userFacade.findByEmail(userEmail)).orElseThrow(() -> new UserException(RESTCodes.UserErrorCode.USER_WAS_NOT_FOUND, Level.FINE, "user: " + userEmail));
        String principalName = KafkaConst.buildPrincipalName(selectedProjectName, user.getUsername());
        Optional<TopicAcls> optionalAcl = this.topicAclsFacade.getTopicAcls(topicName, principalName, permissionType, operationType, host, role);
        if (optionalAcl.isPresent()) {
            return Pair.of((Object)optionalAcl.get(), (Object)Response.Status.OK);
        }
        TopicAcls acl = this.topicAclsFacade.addAclsToTopic(pt, user, permissionType, operationType, host, role, principalName);
        return Pair.of((Object)acl, (Object)Response.Status.CREATED);
    }

    public TopicDefaultValueDTO topicDefaultValues() throws KafkaException {
        try {
            Set<String> brokers = this.kafkaBrokers.getBrokerEndpoints();
            return new TopicDefaultValueDTO(this.settings.getKafkaDefaultNumReplicas(), this.settings.getKafkaDefaultNumPartitions(), brokers.size());
        }
        catch (IOException | InterruptedException | KeeperException ex) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.KAFKA_GENERIC_ERROR, Level.SEVERE, "", ex.getMessage(), ex);
        }
    }

    public List<SharedProjectDTO> getTopicSharedProjects(String topicName, Integer ownerProjectId) {
        List<SharedTopics> projectIds = this.sharedTopicsFacade.findSharedTopicsByTopicAndOwnerProject(topicName, ownerProjectId);
        ArrayList<SharedProjectDTO> shareProjectDtos = new ArrayList<SharedProjectDTO>();
        for (SharedTopics st : projectIds) {
            Project project = this.projectFacade.find(st.getSharedTopicsPK().getProjectId());
            if (project == null) continue;
            shareProjectDtos.add(new SharedProjectDTO(project.getName(), project.getId()));
        }
        return shareProjectDtos;
    }

    public void unshareTopic(Project requesterProject, String topicName, String destProjectName) throws ProjectException, KafkaException {
        SharedTopics st;
        ArrayList<SharedTopics> list = new ArrayList<SharedTopics>();
        Project destProject = this.projectFacade.findByName(destProjectName);
        ProjectTopics topic = this.projectTopicsFacade.findTopicByName(topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "Topic:" + topicName));
        if (topic.getProject().equals((Object)requesterProject)) {
            if (destProject == null) {
                list.addAll(this.sharedTopicsFacade.findSharedTopicsByTopicName(topicName));
            } else {
                st = this.sharedTopicsFacade.findSharedTopicByProjectAndTopic(destProject.getId(), topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + topicName + ", project: " + destProject.getName()));
                list.add(st);
            }
        } else if (destProject == null) {
            st = this.sharedTopicsFacade.findSharedTopicByProjectAndTopic(requesterProject.getId(), topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + topicName + ", project: " + requesterProject.getId()));
            list.add(st);
        }
        for (SharedTopics st2 : list) {
            this.sharedTopicsFacade.remove(st2);
            Project projectACLs = this.projectFacade.findById(st2.getSharedTopicsPK().getProjectId()).orElseThrow(() -> new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "project: " + st2.getSharedTopicsPK().getProjectId()));
            this.topicAclsFacade.removeAclFromTopic(topicName, projectACLs);
        }
    }

    public String getKafkaCertPaths(Project project) {
        UserCerts userCert = this.userCerts.findUserCert(project.getName(), project.getOwner().getUsername());
        if (userCert.getUserCert() != null && userCert.getUserCert().length > 0 && userCert.getUserKey() != null && userCert.getUserKey().length > 0) {
            File certDir = new File(this.settings.getHopsworksTrueTempCertDir() + "/" + project.getName());
            if (!certDir.exists()) {
                try {
                    certDir.mkdirs();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            try {
                FileOutputStream fos = new FileOutputStream(certDir.getAbsolutePath() + "/keystore.jks");
                fos.write(userCert.getUserKey());
                fos.close();
                fos = new FileOutputStream(certDir.getAbsolutePath() + "/truststore.jks");
                fos.write(userCert.getUserCert());
                fos.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            return certDir.getAbsolutePath();
        }
        return null;
    }

    public void addProjectMemberToTopics(Project project, String member) throws KafkaException, ProjectException, UserException {
        List<TopicDTO> topics = this.findTopicsByProject(project);
        List<SharedTopics> sharedTopics = this.sharedTopicsFacade.findSharedTopicsByProject(project.getId());
        for (SharedTopics sharedTopic : sharedTopics) {
            this.addAclsToTopic(sharedTopic.getSharedTopicsPK().getTopicName(), sharedTopic.getProjectId(), new AclDTO(project.getName(), member, "allow", "*", "*", "*"));
        }
        for (TopicDTO topic : topics) {
            this.addAclsToTopic(topic.getName(), project.getId(), new AclDTO(project.getName(), member, "allow", "*", "*", "*"));
        }
    }

    public void removeProjectMemberFromTopics(Project project, Users user) throws ProjectException {
        this.sharedTopicsFacade.findSharedTopicsByProject(project.getId()).stream().map(st -> {
            try {
                return this.projectController.findProjectById(st.getSharedTopicsPK().getProjectId());
            }
            catch (ProjectException ex) {
                throw new RuntimeException(ex);
            }
        }).map(Project::getName).forEach(name -> this.topicAclsFacade.removeAclsForUserAndPrincipalProject(user, (String)name));
        this.topicAclsFacade.removeAclsForUser(user, project);
    }

    public List<TopicDTO> findSharedTopicsByProject(Integer projectId) {
        List<SharedTopics> res = this.sharedTopicsFacade.findSharedTopicsByProject(projectId);
        ArrayList<TopicDTO> topics = new ArrayList<TopicDTO>();
        for (SharedTopics pt : res) {
            this.projectTopicsFacade.findTopicByName(pt.getSharedTopicsPK().getTopicName()).map(topic -> new TopicDTO(pt.getSharedTopicsPK().getTopicName(), pt.getProjectId(), topic.getSubjects().getSubject(), topic.getSubjects().getVersion(), topic.getSubjects().getSchema().getSchema(), true, pt.getAccepted())).ifPresent(topics::add);
        }
        return topics;
    }

    public void updateTopicSchemaVersion(Project project, String topicName, Integer schemaVersion) throws KafkaException {
        ProjectTopics pt = this.projectTopicsFacade.findTopicByNameAndProject(project, topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName));
        String schemaName = pt.getSubjects().getSubject();
        Subjects st = this.subjectsFacade.findSubjectByNameAndVersion(project, schemaName, schemaVersion).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_VERSION_NOT_FOUND, Level.FINE, "schema: " + schemaName + ", version: " + schemaVersion));
        this.projectTopicsFacade.updateTopicSchemaVersion(pt, st);
    }

    public void removeAclsForUser(Users user, Integer projectId) throws ProjectException {
        Project project = this.projectFacade.find(projectId);
        if (project == null) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "projectId:" + projectId);
        }
        this.topicAclsFacade.removeAclsForUser(user, project);
    }

    public void removeAclForProject(Integer projectId) throws ProjectException {
        Project project = this.projectFacade.find(projectId);
        if (project == null) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "projectId:" + projectId);
        }
        this.topicAclsFacade.removeAclForProject(project);
    }

    public Integer updateTopicAcl(Project project, String topicName, Integer aclId, AclDTO aclDto) throws KafkaException, ProjectException, UserException {
        if (!this.projectTopicsFacade.findTopicByNameAndProject(project, topicName).isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, topicName);
        }
        TopicAcls ta = (TopicAcls)this.topicAclsFacade.find(aclId);
        if (ta == null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOUND, Level.FINE, "topic: " + topicName);
        }
        this.topicAclsFacade.remove(ta);
        Pair<TopicAcls, Response.Status> aclTuple = this.addAclsToTopic(topicName, project.getId(), aclDto);
        return ((TopicAcls)aclTuple.getLeft()).getId();
    }

    public Optional<TopicAcls> findAclByIdAndTopic(String topicName, Integer aclId) throws KafkaException {
        Optional<TopicAcls> acl = Optional.ofNullable(this.topicAclsFacade.find(aclId));
        if (acl.isPresent()) {
            if (!topicName.equals(((TopicAcls)acl.get()).getProjectTopics().getTopicName())) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOR_TOPIC, Level.FINE, "topic: " + topicName);
            }
        } else {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOUND, Level.FINE, "aclId: " + aclId);
        }
        return acl;
    }

    public void removeAclFromTopic(String topicName, Integer aclId) throws KafkaException {
        TopicAcls ta = (TopicAcls)this.topicAclsFacade.find(aclId);
        if (ta == null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOUND, Level.FINE, "topic: " + topicName);
        }
        if (!ta.getProjectTopics().getTopicName().equals(topicName)) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOR_TOPIC, Level.FINE, "topic: " + topicName);
        }
        this.topicAclsFacade.remove(ta);
    }

    public SubjectDTO getSubjectForTopic(Project project, String topic) throws KafkaException, ProjectException {
        Optional<ProjectTopics> pt = this.projectTopicsFacade.findTopicByNameAndProject(project, topic);
        if (!pt.isPresent()) {
            SharedTopics sharedTopic = this.sharedTopicsFacade.findSharedTopicByProjectAndTopic(project.getId(), topic).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + topic + ", project: " + project.getName()));
            Project sharedWithProject = this.projectFacade.findById(sharedTopic.getProjectId()).orElseThrow(() -> new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "projectId: " + sharedTopic.getSharedTopicsPK().getProjectId()));
            pt = this.projectTopicsFacade.findTopicByNameAndProject(sharedWithProject, topic);
        }
        if (!pt.isPresent()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "project=" + project.getName() + ", topic=" + topic);
        }
        return new SubjectDTO(pt.get().getSubjects());
    }

    public void updateTopicSubjectVersion(Project project, String topic, String subject, Integer version) throws KafkaException, SchemaException {
        ProjectTopics pt = this.projectTopicsFacade.findTopicByNameAndProject(project, topic).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topic));
        String topicSubject = pt.getSubjects().getSubject();
        Subjects st = this.subjectsFacade.findSubjectByNameAndVersion(project, subject, version).orElseThrow(() -> new SchemaException(RESTCodes.SchemaRegistryErrorCode.VERSION_NOT_FOUND, Level.FINE, "schema: " + topicSubject + ", version: " + version));
        this.projectTopicsFacade.updateTopicSchemaVersion(pt, st);
    }
}

