/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.dao.kafka;

import io.hops.hopsworks.common.dao.kafka.AclDTO;
import io.hops.hopsworks.common.dao.kafka.AclUserDTO;
import io.hops.hopsworks.common.dao.kafka.PartitionDetailsDTO;
import io.hops.hopsworks.common.dao.kafka.ProjectTopics;
import io.hops.hopsworks.common.dao.kafka.SchemaCompatiblityCheck;
import io.hops.hopsworks.common.dao.kafka.SchemaDTO;
import io.hops.hopsworks.common.dao.kafka.SchemaTopics;
import io.hops.hopsworks.common.dao.kafka.SchemaTopicsPK;
import io.hops.hopsworks.common.dao.kafka.SharedProjectDTO;
import io.hops.hopsworks.common.dao.kafka.SharedTopics;
import io.hops.hopsworks.common.dao.kafka.SharedTopicsPK;
import io.hops.hopsworks.common.dao.kafka.TopicAcls;
import io.hops.hopsworks.common.dao.kafka.TopicDTO;
import io.hops.hopsworks.common.dao.kafka.TopicDefaultValueDTO;
import io.hops.hopsworks.common.dao.project.Project;
import io.hops.hopsworks.common.dao.project.ProjectFacade;
import io.hops.hopsworks.common.dao.project.team.ProjectTeam;
import io.hops.hopsworks.common.dao.user.UserFacade;
import io.hops.hopsworks.common.dao.user.Users;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.security.BaseHadoopClientsService;
import io.hops.hopsworks.common.security.CertificateMaterializer;
import io.hops.hopsworks.common.util.HopsUtils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.CryptoPasswordNotFoundException;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.persistence.EntityManager;
import javax.persistence.NoResultException;
import javax.persistence.PersistenceContext;
import javax.persistence.TypedQuery;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.common.TopicAlreadyMarkedForDeletionException;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.SchemaParseException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.zookeeper.KeeperException;
import org.elasticsearch.common.Strings;

@Stateless
public class KafkaFacade {
    private static final Logger LOGGER = Logger.getLogger(KafkaFacade.class.getName());
    @PersistenceContext(unitName="kthfsPU")
    private EntityManager em;
    @EJB
    Settings settings;
    @EJB
    private ProjectFacade projectsFacade;
    @EJB
    private CertificateMaterializer certificateMaterializer;
    @EJB
    private BaseHadoopClientsService baseHadoopService;
    @EJB
    private HdfsUsersController hdfsUsersController;
    @EJB
    private UserFacade userFacade;
    private static final String COLON_SEPARATOR = ":";
    public static final String SLASH_SEPARATOR = "//";
    public static final String KAFKA_SECURITY_PROTOCOL = "SSL";
    private static final String KAFKA_BROKER_EXTERNAL_PROTOCOL = "EXTERNAL";
    private static final String PROJECT_DELIMITER = "__";
    public static final String DLIMITER = "[\"]";

    protected EntityManager getEntityManager() {
        return this.em;
    }

    public List<TopicDTO> findTopicsByProject(Project project) {
        List res = this.em.createNamedQuery("ProjectTopics.findByProject", ProjectTopics.class).setParameter("project", (Object)project).getResultList();
        ArrayList<TopicDTO> topics = new ArrayList<TopicDTO>();
        if (res != null && !res.isEmpty()) {
            topics = new ArrayList();
            for (ProjectTopics pt : res) {
                topics.add(new TopicDTO(pt.getTopicName(), pt.getSchemaTopics().getSchemaTopicsPK().getName(), pt.getSchemaTopics().getSchemaTopicsPK().getVersion()));
            }
        }
        return topics;
    }

    public ProjectTopics findTopicByNameAndProject(Project project, String topicName) {
        try {
            return (ProjectTopics)this.em.createNamedQuery("ProjectTopics.findByProjectAndTopicName", ProjectTopics.class).setParameter("project", (Object)project).setParameter("topicName", (Object)topicName).getSingleResult();
        }
        catch (NoResultException e) {
            return null;
        }
    }

    public List<SharedTopics> findSharedTopicsByProject(Integer projectId) {
        TypedQuery query = this.em.createNamedQuery("SharedTopics.findByProjectId", SharedTopics.class);
        query.setParameter("projectId", (Object)projectId);
        return query.getResultList();
    }

    public List<PartitionDetailsDTO> getTopicDetails(Project project, Users user, String topicName) throws Exception {
        List<TopicDTO> topics = this.findTopicsByProject(project);
        ArrayList<PartitionDetailsDTO> topicDetailDTO = new ArrayList<PartitionDetailsDTO>();
        if (topics != null && !topics.isEmpty()) {
            for (TopicDTO topic : topics) {
                if (!topic.getName().equalsIgnoreCase(topicName)) continue;
                topicDetailDTO.addAll(this.getTopicDetailsfromKafkaCluster(project, user, topicName));
                return topicDetailDTO;
            }
        }
        return topicDetailDTO;
    }

    private int getPort(String zkIp) {
        return Integer.parseInt(zkIp.split(COLON_SEPARATOR)[1]);
    }

    public InetAddress getIp(String zkIp) throws ServiceException {
        String ip = zkIp.split(COLON_SEPARATOR)[0];
        try {
            return InetAddress.getByName(ip);
        }
        catch (UnknownHostException ex) {
            throw new ServiceException(RESTCodes.ServiceErrorCode.ZOOKEEPER_SERVICE_UNAVAILABLE, Level.SEVERE, ex.getMessage());
        }
    }

    public List<Project> findProjectforTopic(String topicName) {
        TypedQuery query = this.em.createNamedQuery("ProjectTopics.findByTopicName", ProjectTopics.class);
        query.setParameter("topicName", (Object)topicName);
        List resp = query.getResultList();
        ArrayList<Project> projects = new ArrayList<Project>();
        if (resp != null && !resp.isEmpty()) {
            for (ProjectTopics pt : resp) {
                Project p = (Project)this.em.find(Project.class, (Object)pt.getProject());
                if (p == null) continue;
                projects.add(p);
            }
        }
        return projects;
    }

    public ProjectTopics createTopicInProject(Integer projectId, TopicDTO topicDto) throws ProjectException, KafkaException, ServiceException {
        if (projectId == null || topicDto == null) {
            throw new IllegalArgumentException("projectId or topicDto were not provided.");
        }
        Project project = (Project)this.em.find(Project.class, (Object)projectId);
        if (project == null) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE);
        }
        return this.createTopicInProject(project, topicDto);
    }

    public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) throws KafkaException, ServiceException {
        String topicName = topicDto.getName();
        List res = this.em.createNamedQuery("ProjectTopics.findByTopicName", ProjectTopics.class).setParameter("topicName", (Object)topicName).getResultList();
        if (!res.isEmpty()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS, Level.FINE, "topic name: " + topicName);
        }
        List topicsInProject = this.em.createNamedQuery("ProjectTopics.findByProject", ProjectTopics.class).setParameter("project", (Object)project).getResultList();
        if (topicsInProject != null && topicsInProject.size() >= project.getKafkaMaxNumTopics()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_LIMIT_REACHED, Level.FINE, "topic name: " + topicName + ", project: " + project.getName());
        }
        try {
            Set<String> brokerEndpoints = this.settings.getBrokerEndpoints();
            if (brokerEndpoints.size() < topicDto.getNumOfReplicas()) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_REPLICATION_ERROR, Level.FINE, "maximum: " + brokerEndpoints.size());
            }
        }
        catch (IOException | InterruptedException | KeeperException ex) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.KAFKA_GENERIC_ERROR, Level.SEVERE, "project: " + project.getName(), ex.getMessage(), ex);
        }
        ZkClient zkClient = new ZkClient(this.getIp(this.settings.getZkConnectStr()).getHostName(), 30000, 30000, (ZkSerializer)ZKStringSerializer$.MODULE$);
        ZkConnection zkConnection = new ZkConnection(this.settings.getZkConnectStr());
        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
        try {
            if (!AdminUtils.topicExists((ZkUtils)zkUtils, (String)topicName)) {
                AdminUtils.createTopic((ZkUtils)zkUtils, (String)topicName, (int)topicDto.getNumOfPartitions(), (int)topicDto.getNumOfReplicas(), (Properties)new Properties(), (RackAwareMode)RackAwareMode.Enforced$.MODULE$);
            }
        }
        catch (TopicExistsException ex) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS_IN_ZOOKEEPER, Level.INFO, "topic name: " + topicName, ex.getMessage());
        }
        finally {
            zkClient.close();
            try {
                zkConnection.close();
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, null, ex.getMessage());
            }
        }
        SchemaTopics schema = (SchemaTopics)this.em.find(SchemaTopics.class, (Object)new SchemaTopicsPK(topicDto.getSchemaName(), topicDto.getSchemaVersion()));
        if (schema == null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_NOT_FOUND, Level.FINE, "topic: " + topicName);
        }
        ProjectTopics pt = new ProjectTopics(topicName, project, schema);
        this.em.persist((Object)pt);
        this.em.flush();
        return pt;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeTopicFromProject(Project project, String topicName) throws KafkaException, ServiceException {
        ProjectTopics pt = null;
        try {
            pt = (ProjectTopics)this.em.createNamedQuery("ProjectTopics.findByProjectAndTopicName", ProjectTopics.class).setParameter("project", (Object)project).setParameter("topicName", (Object)topicName).getSingleResult();
        }
        catch (NoResultException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName);
        }
        this.em.remove((Object)pt);
        ZkClient zkClient = new ZkClient(this.getIp(this.settings.getZkConnectStr()).getHostName(), 30000, 30000, (ZkSerializer)ZKStringSerializer$.MODULE$);
        ZkConnection zkConnection = new ZkConnection(this.settings.getZkConnectStr());
        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
        try {
            AdminUtils.deleteTopic((ZkUtils)zkUtils, (String)topicName);
        }
        finally {
            zkClient.close();
            try {
                zkConnection.close();
            }
            catch (InterruptedException ex) {
                Logger.getLogger(KafkaFacade.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeAllTopicsFromProject(Project project) throws InterruptedException, ServiceException {
        List topics = this.em.createNamedQuery("ProjectTopics.findByProject", ProjectTopics.class).setParameter("project", (Object)project).getResultList();
        if (topics == null || topics.isEmpty()) {
            return;
        }
        ZkClient zkClient = null;
        ZkConnection zkConnection = null;
        try {
            zkClient = new ZkClient(this.getIp(this.settings.getZkConnectStr()).getHostName(), 30000, 30000, (ZkSerializer)ZKStringSerializer$.MODULE$);
            zkConnection = new ZkConnection(this.settings.getZkConnectStr());
            for (ProjectTopics topic : topics) {
                this.em.remove((Object)topic);
                ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
                try {
                    AdminUtils.deleteTopic((ZkUtils)zkUtils, (String)topic.getTopicName());
                }
                catch (TopicAlreadyMarkedForDeletionException topicAlreadyMarkedForDeletionException) {}
            }
        }
        finally {
            if (zkClient != null) {
                zkClient.close();
            }
            if (zkConnection != null) {
                zkConnection.close();
            }
        }
    }

    public void removeAclsForUser(Users user, Integer projectId) throws ProjectException {
        Project project = (Project)this.em.find(Project.class, (Object)projectId);
        if (project == null) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "projectId:" + projectId);
        }
        this.removeAclsForUser(user, project);
    }

    public void removeAclsForUser(Users user, Project project) {
        this.em.createNamedQuery("TopicAcls.deleteByUser", TopicAcls.class).setParameter("user", (Object)user).setParameter("project", (Object)project).executeUpdate();
    }

    public void removeAclForProject(Integer projectId) throws ProjectException {
        Project project = (Project)this.em.find(Project.class, (Object)projectId);
        if (project == null) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "projectId:" + projectId);
        }
        this.removeAclForProject(project);
    }

    public void removeAclForProject(Project project) {
        this.em.createNamedQuery("TopicAcls.findAll", TopicAcls.class).getResultList().stream().filter(acl -> acl.getPrincipal().split(PROJECT_DELIMITER)[0].equals(project.getName())).forEach(acl -> this.em.remove(acl));
    }

    public TopicDefaultValueDTO topicDefaultValues() throws InterruptedException, IOException, KeeperException {
        Set<String> brokers = this.settings.getBrokerEndpoints();
        return new TopicDefaultValueDTO(this.settings.getKafkaDefaultNumReplicas(), this.settings.getKafkaDefaultNumPartitions(), brokers.size());
    }

    public void shareTopic(Project owningProject, String topicName, Integer projectId) throws KafkaException {
        if (owningProject.getId().equals(projectId)) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.DESTINATION_PROJECT_IS_TOPIC_OWNER, Level.FINE);
        }
        ProjectTopics pt = null;
        try {
            pt = (ProjectTopics)this.em.createNamedQuery("ProjectTopics.findByProjectAndTopicName", ProjectTopics.class).setParameter("project", (Object)owningProject).setParameter("topicName", (Object)topicName).getSingleResult();
        }
        catch (NoResultException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName);
        }
        SharedTopics sharedTopics = (SharedTopics)this.em.find(SharedTopics.class, (Object)new SharedTopicsPK(topicName, projectId));
        if (sharedTopics != null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_SHARED, Level.FINE, "topic: " + topicName);
        }
        SharedTopics st = new SharedTopics(topicName, owningProject.getId(), projectId);
        this.em.persist((Object)st);
        this.em.flush();
    }

    public void unShareTopic(String topicName, Integer ownerProjectId) throws KafkaException {
        SharedTopics pt = (SharedTopics)this.em.find(SharedTopics.class, (Object)new SharedTopicsPK(topicName, ownerProjectId));
        if (pt == null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_SHARED, Level.FINE, "topic: " + topicName);
        }
        this.em.remove((Object)pt);
    }

    public List<SharedProjectDTO> topicIsSharedTo(String topicName, Integer projectId) {
        ArrayList<SharedProjectDTO> shareProjectDtos = new ArrayList<SharedProjectDTO>();
        TypedQuery query = this.em.createNamedQuery("SharedTopics.findByTopicNameAndProjectId", SharedTopics.class);
        query.setParameter("topicName", (Object)topicName);
        query.setParameter("projectId", (Object)projectId);
        List projectIds = query.getResultList();
        for (SharedTopics st : projectIds) {
            Project project = (Project)this.em.find(Project.class, (Object)st.getSharedTopicsPK().getProjectId());
            if (project == null) continue;
            shareProjectDtos.add(new SharedProjectDTO(project.getName(), project.getId()));
        }
        return shareProjectDtos;
    }

    public List<AclUserDTO> aclUsers(Integer projectId, String topicName) {
        if (projectId == null || projectId < 0 || Strings.isNullOrEmpty((String)topicName)) {
            throw new IllegalArgumentException("ProjectId must be non-null non-negative number, topic must be provided");
        }
        Project project = (Project)this.em.find(Project.class, (Object)projectId);
        ArrayList<AclUserDTO> aclUsers = new ArrayList<AclUserDTO>();
        ArrayList<String> teamMembers = new ArrayList<String>();
        for (ProjectTeam pt : project.getProjectTeamCollection()) {
            teamMembers.add(pt.getUser().getEmail());
        }
        teamMembers.add("*");
        HashMap projectMemberCollections = new HashMap();
        projectMemberCollections.put(project.getName(), teamMembers);
        TypedQuery query = this.em.createNamedQuery("SharedTopics.findByTopicName", SharedTopics.class);
        query.setParameter("topicName", (Object)topicName);
        ArrayList<String> sharedMembers = new ArrayList<String>();
        for (SharedTopics sharedTopics : query.getResultList()) {
            project = (Project)this.em.find(Project.class, (Object)sharedTopics.getSharedTopicsPK().getProjectId());
            for (ProjectTeam pt : project.getProjectTeamCollection()) {
                sharedMembers.add(pt.getUser().getEmail());
            }
            sharedMembers.add("*");
            projectMemberCollections.put(project.getName(), sharedMembers);
        }
        for (Map.Entry entry : projectMemberCollections.entrySet()) {
            aclUsers.add(new AclUserDTO((String)entry.getKey(), (List)entry.getValue()));
        }
        return aclUsers;
    }

    public void addAclsToTopic(String topicName, Integer projectId, AclDTO dto) throws KafkaException, ProjectException, UserException {
        this.addAclsToTopic(topicName, projectId, dto.getProjectName(), dto.getUserEmail(), dto.getPermissionType(), dto.getOperationType(), dto.getHost(), dto.getRole());
    }

    private void addAclsToTopic(String topicName, Integer projectId, String selectedProjectName, String userEmail, String permission_type, String operation_type, String host, String role) throws ProjectException, KafkaException, UserException {
        Project project;
        if (Strings.isNullOrEmpty((String)topicName) || projectId == null || projectId < 0 || userEmail == null) {
            throw new IllegalArgumentException("Topic, userEmail and projectId must be provided. ProjectId must be a non-negative number");
        }
        Project topicOwnerProject = this.projectsFacade.find(projectId);
        if (!topicOwnerProject.getName().equals(selectedProjectName)) {
            project = this.projectsFacade.findByName(selectedProjectName);
            if (project == null) {
                throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND, Level.FINE, "The specified project for the topic" + topicName + " was not found");
            }
        } else {
            project = topicOwnerProject;
        }
        ProjectTopics pt = null;
        try {
            pt = (ProjectTopics)this.em.createNamedQuery("ProjectTopics.findByProjectAndTopicName", ProjectTopics.class).setParameter("project", (Object)topicOwnerProject).setParameter("topicName", (Object)topicName).getSingleResult();
        }
        catch (NoResultException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "Topic: " + topicName);
        }
        if (!userEmail.equals("*")) {
            Users user = this.userFacade.findByEmail(userEmail);
            if (user == null) {
                throw new UserException(RESTCodes.UserErrorCode.USER_WAS_NOT_FOUND, Level.FINE, "user: " + userEmail);
            }
            String principalName = selectedProjectName + PROJECT_DELIMITER + user.getUsername();
            TopicAcls topicAcl = this.getTopicAcl(topicName, principalName, permission_type, operation_type, host, role);
            if (topicAcl != null && topicAcl.getProjectTopics().getTopicName().equals(topicName) && topicAcl.getHost().equals(host) && topicAcl.getOperationType().equalsIgnoreCase(operation_type) && topicAcl.getPermissionType().equalsIgnoreCase(permission_type) && topicAcl.getRole().equalsIgnoreCase(role)) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_ALREADY_EXISTS, Level.FINE, "topicAcl:" + topicAcl.toString());
            }
            TopicAcls ta = new TopicAcls(pt, user, permission_type, operation_type, host, role, principalName);
            this.em.persist((Object)ta);
            this.em.flush();
        } else {
            for (ProjectTeam p : project.getProjectTeamCollection()) {
                Users selectedUser = p.getUser();
                String principalName = selectedProjectName + PROJECT_DELIMITER + selectedUser.getUsername();
                TopicAcls topicAcl = this.getTopicAcl(topicName, principalName, permission_type, operation_type, host, role);
                if (topicAcl != null) continue;
                TopicAcls ta = new TopicAcls(pt, selectedUser, permission_type, operation_type, host, role, principalName);
                this.em.persist((Object)ta);
                this.em.flush();
            }
        }
    }

    public void updateTopicAcl(Project project, String topicName, Integer aclId, AclDTO aclDto) throws KafkaException, ProjectException, UserException {
        ProjectTopics pt = null;
        try {
            pt = (ProjectTopics)this.em.createNamedQuery("ProjectTopics.findByProjectAndTopicName", ProjectTopics.class).setParameter("project", (Object)project).setParameter("topicName", (Object)topicName).getSingleResult();
        }
        catch (NoResultException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, topicName);
        }
        TopicAcls ta = (TopicAcls)this.em.find(TopicAcls.class, (Object)aclId);
        if (ta == null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOUND, Level.FINE, "topic: " + topicName);
        }
        this.em.remove((Object)ta);
        this.addAclsToTopic(topicName, project.getId(), aclDto);
    }

    public void removeAclFromTopic(String topicName, Integer aclId) throws KafkaException {
        TopicAcls ta = (TopicAcls)this.em.find(TopicAcls.class, (Object)aclId);
        if (ta == null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOUND, Level.FINE, "topic: " + topicName);
        }
        if (!ta.getProjectTopics().getTopicName().equals(topicName)) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.ACL_NOT_FOR_TOPIC, Level.FINE, "topic: " + topicName);
        }
        this.em.remove((Object)ta);
    }

    public void removeAclFromTopic(String topicName, Project project) {
        this.em.createNamedQuery("TopicAcls.findByTopicName", TopicAcls.class).setParameter("topicName", (Object)topicName).getResultList().stream().filter(acl -> acl.getPrincipal().split(PROJECT_DELIMITER)[0].equals(project.getName())).forEach(acl -> this.em.remove(acl));
    }

    public TopicAcls getTopicAcl(String topicName, String principal, String permission_type, String operation_type, String host, String role) {
        TypedQuery query = this.em.createNamedQuery("TopicAcls.findAcl", TopicAcls.class).setParameter("topicName", (Object)topicName).setParameter("principal", (Object)principal).setParameter("role", (Object)role).setParameter("host", (Object)host).setParameter("operationType", (Object)operation_type).setParameter("permissionType", (Object)permission_type);
        if (query.getResultList() != null && query.getResultList().size() == 1) {
            return (TopicAcls)query.getResultList().get(0);
        }
        return null;
    }

    public List<AclDTO> getTopicAcls(String topicName, Project project) throws KafkaException {
        ProjectTopics pt = null;
        try {
            pt = (ProjectTopics)this.em.createNamedQuery("ProjectTopics.findByProjectAndTopicName", ProjectTopics.class).setParameter("project", (Object)project).setParameter("topicName", (Object)topicName).getSingleResult();
        }
        catch (NoResultException e) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName);
        }
        TypedQuery query = this.em.createNamedQuery("TopicAcls.findByTopicName", TopicAcls.class).setParameter("topicName", (Object)topicName);
        List acls = query.getResultList();
        ArrayList<AclDTO> aclDtos = new ArrayList<AclDTO>();
        for (TopicAcls ta : acls) {
            String projectName = ta.getPrincipal().split(PROJECT_DELIMITER)[0];
            aclDtos.add(new AclDTO(ta.getId(), projectName, ta.getUser().getEmail(), ta.getPermissionType(), ta.getOperationType(), ta.getHost(), ta.getRole()));
        }
        return aclDtos;
    }

    public void validateSchema(SchemaDTO schemaDTO) throws KafkaException {
        if (schemaDTO == null) {
            throw new IllegalArgumentException("No schema provided");
        }
        this.validateSchemaNameAgainstBlacklist(schemaDTO.getName(), RESTCodes.KafkaErrorCode.CREATE_SCHEMA_RESERVED_NAME);
    }

    public void validateSchemaNameAgainstBlacklist(String schemaName, RESTCodes.KafkaErrorCode restCode) throws KafkaException {
        if (Settings.KAFKA_SCHEMA_BLACKLIST.contains(schemaName)) {
            throw new KafkaException(restCode, Level.FINE);
        }
    }

    public SchemaCompatiblityCheck schemaBackwardCompatibility(SchemaDTO schemaDto) {
        String schemaContent = schemaDto.getContents();
        TypedQuery query = this.em.createNamedQuery("SchemaTopics.findByName", SchemaTopics.class);
        query.setParameter("name", (Object)schemaDto.getName());
        try {
            Schema reader = new Schema.Parser().parse(schemaContent);
            for (SchemaTopics schemaTopic : query.getResultList()) {
                Schema writer = new Schema.Parser().parse(schemaTopic.getContents());
                SchemaCompatibility.SchemaPairCompatibility schemaCompatibility = SchemaCompatibility.checkReaderWriterCompatibility((Schema)reader, (Schema)writer);
                switch (schemaCompatibility.getType()) {
                    case COMPATIBLE: {
                        break;
                    }
                    case INCOMPATIBLE: {
                        return SchemaCompatiblityCheck.INCOMPATIBLE;
                    }
                }
            }
        }
        catch (SchemaParseException ex) {
            return SchemaCompatiblityCheck.INVALID;
        }
        return SchemaCompatiblityCheck.COMPATIBLE;
    }

    public void addSchemaForTopics(SchemaDTO schemaDto) {
        int newVersion = 1;
        TypedQuery query = this.em.createNamedQuery("SchemaTopics.findByName", SchemaTopics.class);
        query.setParameter("name", (Object)schemaDto.getName());
        List schemaTopics = query.getResultList();
        if (schemaTopics != null && !schemaTopics.isEmpty()) {
            for (SchemaTopics schemaTopic : schemaTopics) {
                int schemaVersion = schemaTopic.getSchemaTopicsPK().getVersion();
                if (newVersion >= schemaVersion) continue;
                newVersion = schemaVersion;
            }
            ++newVersion;
        }
        SchemaTopics schema = new SchemaTopics(schemaDto.getName(), newVersion, schemaDto.getContents(), new Date());
        this.em.persist((Object)schema);
        this.em.flush();
    }

    public SchemaDTO getSchemaForTopic(String topicName) {
        ProjectTopics topic;
        SchemaTopics schema;
        List topics = this.em.createNamedQuery("ProjectTopics.findByTopicName", ProjectTopics.class).setParameter("topicName", (Object)topicName).getResultList();
        if (topics != null && !topics.isEmpty() && (schema = (SchemaTopics)this.em.find(SchemaTopics.class, (Object)new SchemaTopicsPK((topic = (ProjectTopics)topics.get(0)).getSchemaTopics().getSchemaTopicsPK().getName(), topic.getSchemaTopics().getSchemaTopicsPK().getVersion()))) != null) {
            return new SchemaDTO(schema.getContents());
        }
        return null;
    }

    public SchemaTopics getSchema(String schemaName, Integer schemaVersion) {
        return (SchemaTopics)this.em.createNamedQuery("SchemaTopics.findByNameAndVersion", SchemaTopics.class).setParameter("name", (Object)schemaName).setParameter("version", (Object)schemaVersion).getSingleResult();
    }

    public List<SchemaDTO> listSchemasForTopics() {
        HashMap<String, List> schemas = new HashMap<String, List>();
        ArrayList<SchemaDTO> schemaDtos = new ArrayList<SchemaDTO>();
        TypedQuery query = this.em.createNamedQuery("SchemaTopics.findAll", SchemaTopics.class);
        for (SchemaTopics schemaTopics : query.getResultList()) {
            String schemaName = schemaTopics.getSchemaTopicsPK().getName();
            schemas.computeIfAbsent(schemaName, k -> new ArrayList());
            ((List)schemas.get(schemaName)).add(schemaTopics.getSchemaTopicsPK().getVersion());
        }
        for (Map.Entry entry : schemas.entrySet()) {
            schemaDtos.add(new SchemaDTO((String)entry.getKey(), (List)entry.getValue()));
        }
        return schemaDtos;
    }

    public SchemaDTO getSchemaContent(String schemaName, Integer schemaVersion) throws KafkaException {
        SchemaTopics schemaTopic = (SchemaTopics)this.em.find(SchemaTopics.class, (Object)new SchemaTopicsPK(schemaName, schemaVersion));
        if (schemaTopic == null) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_NOT_FOUND, Level.FINE, "Schema: " + schemaName);
        }
        return new SchemaDTO(schemaTopic.getContents());
    }

    public void deleteSchema(String schemaName, Integer version) throws KafkaException {
        this.validateSchemaNameAgainstBlacklist(schemaName, RESTCodes.KafkaErrorCode.DELETE_RESERVED_SCHEMA);
        List topics = this.em.createNamedQuery("ProjectTopics.findBySchemaVersion", ProjectTopics.class).setParameter("schema_name", (Object)schemaName).setParameter("schema_version", (Object)version).getResultList();
        if (topics != null && !topics.isEmpty()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_IN_USE, Level.FINE);
        }
        SchemaTopics schema = (SchemaTopics)this.em.find(SchemaTopics.class, (Object)new SchemaTopicsPK(schemaName, version));
        this.em.remove((Object)schema);
        this.em.flush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<PartitionDetailsDTO> getTopicDetailsfromKafkaCluster(Project project, Users user, String topicName) throws KafkaException, CryptoPasswordNotFoundException {
        Set<String> brokers = this.settings.getKafkaBrokers();
        HashMap replicas = new HashMap();
        HashMap inSyncReplicas = new HashMap();
        HashMap<Integer, String> leaders = new HashMap<Integer, String>();
        ArrayList<PartitionDetailsDTO> partitionDetails = new ArrayList<PartitionDetailsDTO>();
        Iterator<String> iter = brokers.iterator();
        while (iter.hasNext()) {
            String seed = iter.next();
            if (!seed.split(COLON_SEPARATOR)[0].equalsIgnoreCase(KAFKA_BROKER_EXTERNAL_PROTOCOL)) continue;
            iter.remove();
        }
        try {
            HopsUtils.copyProjectUserCerts(project, user.getUsername(), this.settings.getHopsworksTmpCertDir(), null, this.certificateMaterializer, this.settings.getHopsRpcTls());
            String projectSpecificUser = this.hdfsUsersController.getHdfsUserName(project, user);
            String certPassword = this.baseHadoopService.getProjectSpecificUserCertPassword(projectSpecificUser);
            String brokerAddress = brokers.iterator().next().split("://")[1];
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerAddress);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("security.protocol", KAFKA_SECURITY_PROTOCOL);
            props.setProperty("ssl.truststore.location", this.settings.getHopsworksTmpCertDir() + File.separator + HopsUtils.getProjectTruststoreName(project.getName(), user.getUsername()));
            props.setProperty("ssl.truststore.password", certPassword);
            props.setProperty("ssl.keystore.location", this.settings.getHopsworksTmpCertDir() + File.separator + HopsUtils.getProjectKeystoreName(project.getName(), user.getUsername()));
            props.setProperty("ssl.keystore.password", certPassword);
            props.setProperty("ssl.key.password", certPassword);
            props.setProperty("ssl.endpoint.identification.algorithm", "");
            try (KafkaConsumer consumer = new KafkaConsumer(props);){
                List partitions = consumer.partitionsFor(topicName);
                for (PartitionInfo partition : partitions) {
                    int id = partition.partition();
                    leaders.put(id, partition.leader().host());
                    replicas.put(id, new ArrayList());
                    for (Node node : partition.replicas()) {
                        ((List)replicas.get(id)).add(node.host());
                    }
                    inSyncReplicas.put(id, new ArrayList());
                    for (Node node : partition.inSyncReplicas()) {
                        ((List)inSyncReplicas.get(id)).add(node.host());
                    }
                    partitionDetails.add(new PartitionDetailsDTO(id, (String)leaders.get(id), (List)replicas.get(id), (List)replicas.get(id)));
                }
            }
            catch (Exception ex) {
                throw new KafkaException(RESTCodes.KafkaErrorCode.BROKER_METADATA_ERROR, Level.SEVERE, "Broker: " + brokerAddress, ex.getMessage(), (Throwable)ex);
            }
        }
        finally {
            this.certificateMaterializer.removeCertificatesLocal(user.getUsername(), project.getName());
        }
        Collections.sort(partitionDetails, (c1, c2) -> {
            if (c1.getId() < c2.getId()) {
                return -1;
            }
            if (c1.getId() > c2.getId()) {
                return 1;
            }
            return 0;
        });
        return partitionDetails;
    }

    public Optional<ProjectTopics> getTopicByProjectAndTopicName(Project project, String topicName) {
        try {
            return Optional.of(this.em.createNamedQuery("ProjectTopics.findByProjectAndTopicName", ProjectTopics.class).setParameter("project", (Object)project).setParameter("topicName", (Object)topicName).getSingleResult());
        }
        catch (NoResultException e) {
            return Optional.empty();
        }
    }

    public Optional<SchemaTopics> getSchemaByNameAndVersion(String schemaName, Integer schemaVersion) {
        try {
            return Optional.of(this.em.createNamedQuery("SchemaTopics.findByNameAndVersion", SchemaTopics.class).setParameter("name", (Object)schemaName).setParameter("version", (Object)schemaVersion).getSingleResult());
        }
        catch (NoResultException e) {
            return Optional.empty();
        }
    }

    public void updateTopicSchemaVersion(ProjectTopics pt, SchemaTopics st) {
        pt.setSchemaTopics(new SchemaTopics(st.schemaTopicsPK.getName(), st.schemaTopicsPK.getVersion()));
        this.em.merge((Object)pt);
        this.em.flush();
    }
}

