/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.kafka;

import io.hops.hopsworks.common.dao.kafka.ProjectTopicsFacade;
import io.hops.hopsworks.common.dao.kafka.schemas.CompatibilityCheck;
import io.hops.hopsworks.common.dao.kafka.schemas.SchemaCompatibility;
import io.hops.hopsworks.common.dao.kafka.schemas.Schemas;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.Subjects;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectsCompatibility;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectsCompatibilityFacade;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectsFacade;
import io.hops.hopsworks.common.dao.project.Project;
import io.hops.hopsworks.common.kafka.SchemasController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.restutils.RESTCodes;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class SubjectsController {
    private static final Logger LOGGER = Logger.getLogger(SubjectsController.class.getName());
    @EJB
    private SubjectsFacade subjectsFacade;
    @EJB
    private SubjectsCompatibilityFacade subjectsCompatibilityFacade;
    @EJB
    private SchemasController schemasController;
    @EJB
    private ProjectTopicsFacade projectTopicsFacade;

    public List<String> getSubjects(Project project) {
        return this.subjectsFacade.getListOfSubjects(project);
    }

    public List<Integer> getSubjectVersions(Project project, String subject) throws SchemaException {
        List<Integer> versions = this.subjectsFacade.findSubjectByName(project, subject).stream().map(Subjects::getVersion).sorted().collect(Collectors.toList());
        if (versions.isEmpty()) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.SUBJECT_NOT_FOUND, Level.FINE, "subject=" + subject);
        }
        return versions;
    }

    public SubjectDTO getSubjectDetails(Project project, String subject, String version) throws SchemaException {
        this.validateVersion(version);
        if (this.subjectsFacade.findSubjectByName(project, subject).isEmpty()) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.SUBJECT_NOT_FOUND, Level.FINE, "subject=" + subject);
        }
        Optional<Subjects> optional = version.equals("latest") ? this.subjectsFacade.findSubjectLatestVersion(project, subject) : this.subjectsFacade.findSubjectByNameAndVersion(project, subject, Integer.valueOf(version));
        if (optional.isPresent()) {
            Subjects res = optional.get();
            return new SubjectDTO(res.getSchema().getId(), res.getSubject(), res.getVersion(), res.getSchema().getSchema());
        }
        throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.VERSION_NOT_FOUND, Level.FINE, "subject=" + subject + ", version=" + version);
    }

    public SubjectDTO registerNewSubject(Project project, String subject, String schemaContent, boolean isEnablingKafkaService) throws KafkaException, SchemaException {
        Schema schema;
        this.validateSubject(subject, isEnablingKafkaService);
        try {
            schema = new Schema.Parser().parse(schemaContent);
        }
        catch (SchemaParseException e) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.INVALID_AVRO_SCHEMA, Level.FINE, "schema=" + schemaContent);
        }
        Optional<Subjects> optionalSubject = this.subjectsFacade.findSubjectByNameAndSchema(project, subject, schema.toString());
        if (optionalSubject.isPresent()) {
            return new SubjectDTO(optionalSubject.get().getSchema().getId());
        }
        if (!this.isCompatible(project, subject, schema)) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.INCOMPATIBLE_AVRO_SCHEMA, Level.FINE, "Subject=" + subject + ", project=" + project.getName());
        }
        Integer latestVersion = this.subjectsFacade.findSubjectByName(project, subject).stream().map(Subjects::getVersion).max(Integer::compareTo).orElse(0);
        Schemas schemas = this.schemasController.addNewSchema(project, schema.toString());
        Integer id = this.subjectsFacade.insertNewSubject(project, subject, schemas, latestVersion + 1);
        return new SubjectDTO(id);
    }

    public void validateSubject(String subject, boolean isEnablingKafkaService) throws KafkaException {
        this.validateSubjectNameAgainstBlacklist(subject, isEnablingKafkaService);
    }

    private void validateSubjectNameAgainstBlacklist(String subject, boolean isEnablingKafkaService) throws KafkaException {
        if (!(!Settings.KAFKA_SUBJECT_BLACKLIST.contains(subject) || subject.equals("inferenceschema") && isEnablingKafkaService)) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.CREATE_SUBJECT_RESERVED_NAME, Level.FINE, "subject=" + subject);
        }
    }

    private boolean isCompatible(Project project, String subject, Schema schema) throws SchemaException {
        SchemaCompatibility sc = this.getSubjectOrProjectCompatibility(project, subject);
        if (sc.equals((Object)SchemaCompatibility.NONE)) {
            return true;
        }
        SchemaValidator validator = this.getSchemaValidator(sc);
        List previousSchemas = this.subjectsFacade.findSubjectByName(project, subject).stream().sorted(Comparator.comparing(Subjects::getVersion).reversed()).map(s -> new Schema.Parser().parse(s.getSchema().getSchema())).collect(Collectors.toList());
        try {
            validator.validate(schema, previousSchemas);
        }
        catch (SchemaValidationException e) {
            return false;
        }
        return true;
    }

    private boolean isCompatible(Schema previousSchema, Schema schemaToTest, SchemaCompatibility sc) {
        if (sc == SchemaCompatibility.NONE) {
            return true;
        }
        SchemaValidator validator = this.getSchemaValidator(sc);
        try {
            validator.validate(schemaToTest, Collections.singleton(previousSchema));
        }
        catch (SchemaValidationException e) {
            return false;
        }
        return true;
    }

    private SchemaCompatibility getSubjectOrProjectCompatibility(Project project, String subject) throws SchemaException {
        Optional<SubjectsCompatibility> optional = this.subjectsCompatibilityFacade.getSubjectCompatibility(project, subject);
        if (optional.isPresent()) {
            return optional.get().getCompatibility();
        }
        return this.subjectsCompatibilityFacade.getProjectCompatibility(project).orElseThrow(() -> new SchemaException(RESTCodes.SchemaRegistryErrorCode.SUBJECT_NOT_FOUND, Level.FINE, "Project compatibility not found for project " + project.getName())).getCompatibility();
    }

    private SchemaValidator getSchemaValidator(SchemaCompatibility sc) {
        switch (sc) {
            case BACKWARD: {
                return new SchemaValidatorBuilder().canReadStrategy().validateLatest();
            }
            case BACKWARD_TRANSITIVE: {
                new SchemaValidatorBuilder().canReadStrategy().validateAll();
            }
            case FORWARD: {
                return new SchemaValidatorBuilder().canBeReadStrategy().validateLatest();
            }
            case FORWARD_TRANSITIVE: {
                return new SchemaValidatorBuilder().canBeReadStrategy().validateAll();
            }
            case FULL: {
                return new SchemaValidatorBuilder().mutualReadStrategy().validateLatest();
            }
            case FULL_TRANSITIVE: {
                return new SchemaValidatorBuilder().mutualReadStrategy().validateAll();
            }
        }
        throw new IllegalArgumentException("Unknown schema compatibility " + sc.toString());
    }

    public SubjectDTO checkIfSchemaRegistered(Project project, String subject, String schemaContent) throws SchemaException {
        Schema schema;
        if (!this.subjectsFacade.getListOfSubjects(project).contains(subject)) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.SUBJECT_NOT_FOUND, Level.FINE, "subject=" + subject);
        }
        try {
            schema = new Schema.Parser().parse(schemaContent);
        }
        catch (SchemaParseException e) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.INVALID_AVRO_SCHEMA, Level.FINE, "schema=" + schemaContent);
        }
        Optional<Subjects> optional = this.subjectsFacade.findSubjectByNameAndSchema(project, subject, schema.toString());
        if (!optional.isPresent()) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.SCHEMA_NOT_FOUND, Level.FINE, "schema=" + schema.toString());
        }
        return new SubjectDTO(optional.get());
    }

    public List<Integer> deleteSubject(Project project, String subject) throws SchemaException, KafkaException {
        this.validateSubject(subject, false);
        if (!this.projectTopicsFacade.findTopicsBySubject(project, subject).isEmpty()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_IN_USE, Level.FINE, "project=" + project.getName() + ", subject=" + subject);
        }
        List<Integer> versions = this.getSubjectVersions(project, subject);
        Integer deleted = this.subjectsFacade.deleteSubject(project, subject);
        if (versions.size() != deleted.intValue()) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.INTERNAL_SERVER_ERROR, Level.FINE, "error deleting subject. versions=" + Arrays.toString(versions.toArray()) + ", but deleted " + deleted + " items.");
        }
        this.subjectsCompatibilityFacade.getSubjectCompatibility(project, subject).ifPresent(sc -> this.subjectsCompatibilityFacade.remove(sc));
        return versions;
    }

    public CompatibilityCheck checkIfSchemaCompatible(Project project, String subject, String version, String schemaToTest) throws SchemaException {
        Schema schema;
        this.validateVersion(version);
        try {
            schema = new Schema.Parser().parse(schemaToTest);
        }
        catch (SchemaParseException e) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.INVALID_AVRO_SCHEMA, Level.FINE, "schema=" + schemaToTest);
        }
        if (!this.subjectsFacade.getListOfSubjects(project).contains(subject)) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.SUBJECT_NOT_FOUND, Level.FINE, "subject=" + subject);
        }
        SchemaCompatibility sc = this.getSubjectOrProjectCompatibility(project, subject);
        Optional<Subjects> optional = version.equals("latest") ? this.subjectsFacade.findSubjectLatestVersion(project, subject) : this.subjectsFacade.findSubjectByNameAndVersion(project, subject, Integer.valueOf(version));
        if (!optional.isPresent()) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.VERSION_NOT_FOUND, Level.FINE, "project=" + project.getName() + ", subject=" + subject + ", version=" + version);
        }
        boolean isCompatible = this.isCompatible(new Schema.Parser().parse(optional.get().getSchema().getSchema()), schema, sc);
        return new CompatibilityCheck(isCompatible);
    }

    public Integer deleteSubjectsVersion(Project project, String subject, String version) throws SchemaException, KafkaException {
        this.validateSubject(subject, false);
        this.validateVersion(version);
        if (!this.subjectsFacade.getListOfSubjects(project).contains(subject)) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.SUBJECT_NOT_FOUND, Level.FINE, "subject=" + subject);
        }
        Optional<Subjects> optional = version.equals("latest") ? this.subjectsFacade.findSubjectLatestVersion(project, subject) : this.subjectsFacade.findSubjectByNameAndVersion(project, subject, Integer.valueOf(version));
        if (!optional.isPresent()) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.VERSION_NOT_FOUND, Level.FINE, "project=" + project.getName() + ", subject=" + subject + ", version=" + version);
        }
        Integer versionToDelete = optional.get().getVersion();
        if (!this.projectTopicsFacade.findTopiscBySubjectAndVersion(project, subject, versionToDelete).isEmpty()) {
            throw new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_IN_USE, Level.FINE, "project=" + project.getName() + ", subject=" + subject + ", version=" + versionToDelete);
        }
        this.subjectsFacade.remove(optional.get());
        this.subjectsCompatibilityFacade.getSubjectCompatibility(project, subject).ifPresent(sc -> this.subjectsCompatibilityFacade.remove(sc));
        return versionToDelete;
    }

    private void validateVersion(String version) throws SchemaException {
        if (version.equals("latest")) {
            return;
        }
        try {
            new BigInteger(version);
        }
        catch (NumberFormatException e) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.INVALID_VERSION, Level.FINE, "version=" + version);
        }
        if (new BigInteger(version).compareTo(BigInteger.ONE) < 0 || new BigInteger(version).compareTo(new BigInteger(String.valueOf(Integer.MAX_VALUE))) > 0) {
            throw new SchemaException(RESTCodes.SchemaRegistryErrorCode.INVALID_VERSION, Level.FINE, "version=" + version);
        }
    }
}

