/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.featuregroup.ondemand;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeatureGroupInputValidation;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.featuregroup.ondemand.OnDemandFeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.ondemand.OnDemandFeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreConnectorFacade;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.activity.FeaturestoreActivityMeta;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeature;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeaturegroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandOption;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnectorType;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class OnDemandFeaturegroupController {
    @EJB
    private FeaturegroupController featuregroupController;
    @EJB
    private OnDemandFeaturegroupFacade onDemandFeaturegroupFacade;
    @EJB
    private FeaturestoreConnectorFacade featurestoreConnectorFacade;
    @EJB
    private DistributedFsService distributedFsService;
    @EJB
    private OnlineFeaturegroupController onlineFeatureGroupController;
    @EJB
    private FeatureGroupInputValidation featureGroupInputValidation;
    @EJB
    private FeaturegroupFacade featureGroupFacade;
    @EJB
    private FeaturestoreActivityFacade fsActivityFacade;

    public OnDemandFeaturegroup createOnDemandFeaturegroup(Featurestore featurestore, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO, Project project, Users user) throws FeaturestoreException {
        boolean isJDBCType;
        FeaturestoreConnector connector = this.getStorageConnector(onDemandFeaturegroupDTO.getStorageConnector().getId());
        boolean bl = isJDBCType = connector.getConnectorType() == FeaturestoreConnectorType.JDBC || connector.getConnectorType() == FeaturestoreConnectorType.REDSHIFT || connector.getConnectorType() == FeaturestoreConnectorType.SNOWFLAKE || connector.getConnectorType() == FeaturestoreConnectorType.BIGQUERY;
        if (connector.getConnectorType() == FeaturestoreConnectorType.KAFKA) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_ON_DEMAND_FEATUREGROUP, Level.FINE, connector.getConnectorType() + " storage connectors are not supported as source for on demand feature groups");
        }
        if (Strings.isNullOrEmpty((String)onDemandFeaturegroupDTO.getQuery()) && isJDBCType) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.INVALID_SQL_QUERY, Level.FINE, "SQL Query cannot be empty");
        }
        if (!Strings.isNullOrEmpty((String)onDemandFeaturegroupDTO.getQuery()) && !isJDBCType) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.INVALID_SQL_QUERY, Level.FINE, "SQL query not supported when specifying " + connector.getConnectorType() + " storage connectors");
        }
        if (onDemandFeaturegroupDTO.getDataFormat() == null && !isJDBCType) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_ON_DEMAND_DATA_FORMAT, Level.FINE, "Data format required when specifying " + connector.getConnectorType() + " storage connectors");
        }
        this.createFile(project, user, featurestore, onDemandFeaturegroupDTO);
        OnDemandFeaturegroup onDemandFeaturegroup = new OnDemandFeaturegroup();
        onDemandFeaturegroup.setFeaturestoreConnector(connector);
        onDemandFeaturegroup.setQuery(onDemandFeaturegroupDTO.getQuery());
        onDemandFeaturegroup.setFeatures(this.convertOnDemandFeatures(onDemandFeaturegroupDTO, onDemandFeaturegroup));
        onDemandFeaturegroup.setDataFormat(onDemandFeaturegroupDTO.getDataFormat());
        onDemandFeaturegroup.setPath(onDemandFeaturegroupDTO.getPath());
        if (onDemandFeaturegroupDTO.getOptions() != null) {
            onDemandFeaturegroup.setOptions((Collection)onDemandFeaturegroupDTO.getOptions().stream().map(o -> new OnDemandOption(onDemandFeaturegroup, o.getName(), o.getValue())).collect(Collectors.toList()));
        }
        this.onDemandFeaturegroupFacade.persist(onDemandFeaturegroup);
        return onDemandFeaturegroup;
    }

    public OnDemandFeaturegroup createSpineGroup(Featurestore featurestore, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO, Project project, Users user) throws FeaturestoreException {
        this.createFile(project, user, featurestore, onDemandFeaturegroupDTO);
        OnDemandFeaturegroup onDemandFeaturegroup = new OnDemandFeaturegroup();
        onDemandFeaturegroup.setFeatures(this.convertOnDemandFeatures(onDemandFeaturegroupDTO, onDemandFeaturegroup));
        onDemandFeaturegroup.setSpine(onDemandFeaturegroupDTO.getSpine().booleanValue());
        this.onDemandFeaturegroupFacade.persist(onDemandFeaturegroup);
        return onDemandFeaturegroup;
    }

    public OnDemandFeaturegroupDTO convertOnDemandFeatureGroupToDTO(String featureStoreName, Featuregroup featureGroup, FeaturestoreStorageConnectorDTO storageConnectorDTO) {
        String onlineTopicName = Utils.getFeatureGroupTopicName(featureGroup);
        return new OnDemandFeaturegroupDTO(featureStoreName, featureGroup, storageConnectorDTO, null, onlineTopicName);
    }

    public List<FeatureGroupFeatureDTO> getFeaturesDTO(Featuregroup featureGroup) throws FeaturestoreException {
        List<FeatureGroupFeatureDTO> features = featureGroup.getOnDemandFeaturegroup().getFeatures().stream().sorted(Comparator.comparing(OnDemandFeature::getIdx)).map(fgFeature -> new FeatureGroupFeatureDTO(fgFeature.getName(), fgFeature.getType(), fgFeature.getDescription(), fgFeature.getPrimary(), fgFeature.getDefaultValue(), featureGroup.getId())).collect(Collectors.toList());
        return this.onlineFeatureGroupController.getFeaturegroupFeatures(featureGroup, features);
    }

    public void updateOnDemandFeaturegroupMetadata(Project project, Users user, Featuregroup featuregroup, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO) throws FeaturestoreException, SchemaException, SQLException, KafkaException {
        OnDemandFeaturegroup onDemandFeaturegroup = featuregroup.getOnDemandFeaturegroup();
        List<FeatureGroupFeatureDTO> previousSchema = this.getFeaturesDTO(featuregroup);
        this.verifySchemaUnchangedAndValid(onDemandFeaturegroup.getFeatures(), onDemandFeaturegroupDTO.getFeatures());
        List<FeatureGroupFeatureDTO> newFeatures = this.featureGroupInputValidation.verifyAndGetNewFeatures(previousSchema, onDemandFeaturegroupDTO.getFeatures());
        this.updateOnDemandFeatures(onDemandFeaturegroup, onDemandFeaturegroupDTO.getFeatures());
        if (!newFeatures.isEmpty()) {
            if (featuregroup.isOnlineEnabled()) {
                this.onlineFeatureGroupController.alterOnlineFeatureGroupSchema(featuregroup, newFeatures, onDemandFeaturegroupDTO.getFeatures(), project, user);
            }
            String newFeaturesStr = "New features: " + newFeatures.stream().map(FeatureGroupFeatureDTO::getName).collect(Collectors.joining(","));
            this.fsActivityFacade.logMetadataActivity(user, featuregroup, FeaturestoreActivityMeta.FG_ALTERED, newFeaturesStr);
        }
        this.onDemandFeaturegroupFacade.updateMetadata(onDemandFeaturegroup);
    }

    private void updateOnDemandFeatures(OnDemandFeaturegroup onDemandFeaturegroup, List<FeatureGroupFeatureDTO> featureGroupFeatureDTOs) {
        for (FeatureGroupFeatureDTO feature : featureGroupFeatureDTOs) {
            Optional<OnDemandFeature> previousOnDemandFeature = this.getOnDemandFeature(onDemandFeaturegroup.getFeatures(), feature.getName());
            if (previousOnDemandFeature.isPresent()) {
                previousOnDemandFeature.get().setDescription(feature.getDescription());
                continue;
            }
            onDemandFeaturegroup.getFeatures().add(new OnDemandFeature(onDemandFeaturegroup, feature.getName(), feature.getType(), feature.getDescription(), feature.getPrimary(), Integer.valueOf(onDemandFeaturegroup.getFeatures().size()), feature.getDefaultValue()));
        }
    }

    private Optional<OnDemandFeature> getOnDemandFeature(Collection<OnDemandFeature> onDemandFeatures, String featureName) {
        return onDemandFeatures.stream().filter(feature -> feature.getName().equalsIgnoreCase(featureName)).findAny();
    }

    public void verifySchemaUnchangedAndValid(Collection<OnDemandFeature> previousSchema, List<FeatureGroupFeatureDTO> newSchema) throws FeaturestoreException {
        for (OnDemandFeature feature : previousSchema) {
            FeatureGroupFeatureDTO newFeature = newSchema.stream().filter(newFeat -> feature.getName().equals(newFeat.getName())).findAny().orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Feature " + feature.getName() + " was not found in new schema. It is only possible to append features."));
            if (newFeature.getPrimary() != feature.getPrimary() || !newFeature.getType().equalsIgnoreCase(feature.getType())) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Primary key or type information of feature " + feature.getName() + " changed. Primary key and type cannot be changed when updating features.");
            }
            if (newFeature.getDefaultValue() == null) continue;
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Default values for features appended to external feature groups are not supported. Feature: " + feature.getName() + ", provided default value: " + newFeature.getDefaultValue() + " must be null instead.");
        }
    }

    public void removeOnDemandFeaturegroup(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException {
        this.onDemandFeaturegroupFacade.remove(featuregroup.getOnDemandFeaturegroup());
        DistributedFileSystemOps udfso = null;
        try {
            udfso = this.distributedFsService.getDfsOps(project, user);
            udfso.rm(this.featuregroupController.getFeatureGroupLocation(featuregroup), false);
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_DELETE_ON_DEMAND_FEATUREGROUP, Level.SEVERE, "Error removing the placeholder file", e.getMessage(), (Throwable)e);
        }
        finally {
            this.distributedFsService.closeDfsClient(udfso);
        }
    }

    private List<OnDemandFeature> convertOnDemandFeatures(OnDemandFeaturegroupDTO onDemandFeaturegroupDTO, OnDemandFeaturegroup onDemandFeaturegroup) {
        if (onDemandFeaturegroupDTO.getFeatures().isEmpty()) {
            throw new IllegalArgumentException("No features were provided for on demand feature group");
        }
        int i = 0;
        ArrayList<OnDemandFeature> features = new ArrayList<OnDemandFeature>();
        for (FeatureGroupFeatureDTO f : onDemandFeaturegroupDTO.getFeatures()) {
            features.add(new OnDemandFeature(onDemandFeaturegroup, f.getName(), f.getType(), f.getDescription(), f.getPrimary(), Integer.valueOf(i++), f.getDefaultValue()));
        }
        return features;
    }

    private void createFile(Project project, Users user, Featurestore featurestore, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO) throws FeaturestoreException {
        DistributedFileSystemOps udfso = null;
        try {
            String path = this.featuregroupController.getFeatureGroupLocation(featurestore, onDemandFeaturegroupDTO.getName(), onDemandFeaturegroupDTO.getVersion());
            udfso = this.distributedFsService.getDfsOps(project, user);
            udfso.touchz(path);
            this.distributedFsService.closeDfsClient(udfso);
        }
        catch (IOException e) {
            try {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_ON_DEMAND_FEATUREGROUP, Level.SEVERE, "Error creating the placeholder file", e.getMessage(), (Throwable)e);
            }
            catch (Throwable throwable) {
                this.distributedFsService.closeDfsClient(udfso);
                throw throwable;
            }
        }
    }

    private FeaturestoreConnector getStorageConnector(Integer connectorId) throws FeaturestoreException {
        if (connectorId == null) {
            throw new IllegalArgumentException(RESTCodes.FeaturestoreErrorCode.CONNECTOR_ID_NOT_PROVIDED.getMessage());
        }
        return this.featurestoreConnectorFacade.findById(connectorId).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTOR_NOT_FOUND, Level.FINE, "Connector with id: " + connectorId + " was not found"));
    }

    public void enableFeatureGroupOnline(Featurestore featurestore, Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, SQLException, ServiceException, KafkaException, SchemaException, ProjectException, UserException, IOException, HopsSecurityException {
        List<FeatureGroupFeatureDTO> features = this.getFeaturesDTO(featuregroup);
        if (!featuregroup.isOnlineEnabled()) {
            featuregroup.setOnlineEnabled(true);
            this.onlineFeatureGroupController.setupOnlineFeatureGroup(featurestore, featuregroup, features, project, user);
        }
        this.featureGroupFacade.updateFeaturegroupMetadata(featuregroup);
    }
}

