/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.featuregroup.ondemand;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.featurestore.FeaturestoreFacade;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.ondemand.OnDemandFeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.ondemand.OnDemandFeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreConnectorFacade;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.inode.InodeController;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeature;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeaturegroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandOption;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnectorType;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.hadoop.fs.Path;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class OnDemandFeaturegroupController {
    @EJB
    private OnDemandFeaturegroupFacade onDemandFeaturegroupFacade;
    @EJB
    private FeaturestoreConnectorFacade featurestoreConnectorFacade;
    @EJB
    private FeaturestoreFacade featurestoreFacade;
    @EJB
    private DistributedFsService distributedFsService;
    @EJB
    private HdfsUsersController hdfsUsersController;
    @EJB
    private InodeController inodeController;

    public OnDemandFeaturegroup createOnDemandFeaturegroup(Featurestore featurestore, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO, Project project, Users user) throws FeaturestoreException {
        boolean isJDBCType;
        FeaturestoreConnector connector = this.getStorageConnector(onDemandFeaturegroupDTO.getStorageConnector().getId());
        boolean bl = isJDBCType = connector.getConnectorType() == FeaturestoreConnectorType.JDBC || connector.getConnectorType() == FeaturestoreConnectorType.REDSHIFT || connector.getConnectorType() == FeaturestoreConnectorType.SNOWFLAKE || connector.getConnectorType() == FeaturestoreConnectorType.BIGQUERY;
        if (connector.getConnectorType() == FeaturestoreConnectorType.KAFKA) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_ON_DEMAND_FEATUREGROUP, Level.FINE, connector.getConnectorType() + " storage connectors are not supported as source for on demand feature groups");
        }
        if (Strings.isNullOrEmpty((String)onDemandFeaturegroupDTO.getQuery()) && isJDBCType) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.INVALID_SQL_QUERY, Level.FINE, "SQL Query cannot be empty");
        }
        if (!Strings.isNullOrEmpty((String)onDemandFeaturegroupDTO.getQuery()) && !isJDBCType) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.INVALID_SQL_QUERY, Level.FINE, "SQL query not supported when specifying " + connector.getConnectorType() + " storage connectors");
        }
        if (onDemandFeaturegroupDTO.getDataFormat() == null && !isJDBCType) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_ON_DEMAND_DATA_FORMAT, Level.FINE, "Data format required when specifying " + connector.getConnectorType() + " storage connectors");
        }
        OnDemandFeaturegroup onDemandFeaturegroup = new OnDemandFeaturegroup();
        onDemandFeaturegroup.setDescription(onDemandFeaturegroupDTO.getDescription());
        onDemandFeaturegroup.setFeaturestoreConnector(connector);
        onDemandFeaturegroup.setQuery(onDemandFeaturegroupDTO.getQuery());
        onDemandFeaturegroup.setFeatures(this.convertOnDemandFeatures(onDemandFeaturegroupDTO, onDemandFeaturegroup));
        onDemandFeaturegroup.setInode(this.createFile(project, user, featurestore, onDemandFeaturegroupDTO));
        onDemandFeaturegroup.setDataFormat(onDemandFeaturegroupDTO.getDataFormat());
        onDemandFeaturegroup.setPath(onDemandFeaturegroupDTO.getPath());
        if (onDemandFeaturegroupDTO.getOptions() != null) {
            onDemandFeaturegroup.setOptions((Collection)onDemandFeaturegroupDTO.getOptions().stream().map(o -> new OnDemandOption(onDemandFeaturegroup, o.getName(), o.getValue())).collect(Collectors.toList()));
        }
        this.onDemandFeaturegroupFacade.persist(onDemandFeaturegroup);
        return onDemandFeaturegroup;
    }

    public void updateOnDemandFeaturegroupMetadata(OnDemandFeaturegroup onDemandFeaturegroup, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO) throws FeaturestoreException {
        this.verifySchemaUnchangedAndValid(onDemandFeaturegroup.getFeatures(), onDemandFeaturegroupDTO.getFeatures());
        if (onDemandFeaturegroupDTO.getDescription() != null) {
            onDemandFeaturegroup.setDescription(onDemandFeaturegroupDTO.getDescription());
        }
        this.updateOnDemandFeatures(onDemandFeaturegroup, onDemandFeaturegroupDTO.getFeatures());
        this.onDemandFeaturegroupFacade.updateMetadata(onDemandFeaturegroup);
    }

    private void updateOnDemandFeatures(OnDemandFeaturegroup onDemandFeaturegroup, List<FeatureGroupFeatureDTO> featureGroupFeatureDTOs) {
        for (FeatureGroupFeatureDTO feature : featureGroupFeatureDTOs) {
            Optional<OnDemandFeature> previousOnDemandFeature = this.getOnDemandFeature(onDemandFeaturegroup.getFeatures(), feature.getName());
            if (previousOnDemandFeature.isPresent()) {
                previousOnDemandFeature.get().setDescription(feature.getDescription());
                continue;
            }
            onDemandFeaturegroup.getFeatures().add(new OnDemandFeature(onDemandFeaturegroup, feature.getName(), feature.getType(), feature.getDescription(), feature.getPrimary(), Integer.valueOf(onDemandFeaturegroup.getFeatures().size())));
        }
    }

    private Optional<OnDemandFeature> getOnDemandFeature(Collection<OnDemandFeature> onDemandFeatures, String featureName) {
        return onDemandFeatures.stream().filter(feature -> feature.getName().equalsIgnoreCase(featureName)).findAny();
    }

    public void verifySchemaUnchangedAndValid(Collection<OnDemandFeature> previousSchema, List<FeatureGroupFeatureDTO> newSchema) throws FeaturestoreException {
        for (OnDemandFeature feature : previousSchema) {
            FeatureGroupFeatureDTO newFeature = newSchema.stream().filter(newFeat -> feature.getName().equals(newFeat.getName())).findAny().orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Feature " + feature.getName() + " was not found in new schema. It is only possible to append features."));
            if (newFeature.getPrimary() == feature.getPrimary() && newFeature.getType().equalsIgnoreCase(feature.getType())) continue;
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Primary key or type information of feature " + feature.getName() + " changed. Primary key and type cannot be changed when updating features.");
        }
    }

    public void removeOnDemandFeaturegroup(Featurestore featurestore, Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException {
        String username = this.hdfsUsersController.getHdfsUserName(project, user);
        DistributedFileSystemOps udfso = null;
        this.onDemandFeaturegroupFacade.remove(featuregroup.getOnDemandFeaturegroup());
        try {
            udfso = this.distributedFsService.getDfsOps(username);
            udfso.rm(this.getFilePath(featurestore, featuregroup.getName(), featuregroup.getVersion()), false);
        }
        catch (IOException | URISyntaxException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_DELETE_ON_DEMAND_FEATUREGROUP, Level.SEVERE, "Error removing the placeholder file", e.getMessage(), (Throwable)e);
        }
        finally {
            this.distributedFsService.closeDfsClient(udfso);
        }
    }

    private List<OnDemandFeature> convertOnDemandFeatures(OnDemandFeaturegroupDTO onDemandFeaturegroupDTO, OnDemandFeaturegroup onDemandFeaturegroup) {
        if (onDemandFeaturegroupDTO.getFeatures().isEmpty()) {
            throw new IllegalArgumentException("No features were provided for on demand feature group");
        }
        int i = 0;
        ArrayList<OnDemandFeature> features = new ArrayList<OnDemandFeature>();
        for (FeatureGroupFeatureDTO f : onDemandFeaturegroupDTO.getFeatures()) {
            features.add(new OnDemandFeature(onDemandFeaturegroup, f.getName(), f.getType(), f.getDescription(), f.getPrimary(), Integer.valueOf(i++)));
        }
        return features;
    }

    private Inode createFile(Project project, Users user, Featurestore featurestore, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO) throws FeaturestoreException {
        String username = this.hdfsUsersController.getHdfsUserName(project, user);
        Path path = null;
        DistributedFileSystemOps udfso = null;
        try {
            path = this.getFilePath(featurestore, onDemandFeaturegroupDTO.getName(), onDemandFeaturegroupDTO.getVersion());
            udfso = this.distributedFsService.getDfsOps(username);
            udfso.touchz(path);
            this.distributedFsService.closeDfsClient(udfso);
        }
        catch (IOException | URISyntaxException e) {
            try {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_ON_DEMAND_FEATUREGROUP, Level.SEVERE, "Error creating the placeholder file", e.getMessage(), (Throwable)e);
            }
            catch (Throwable throwable) {
                this.distributedFsService.closeDfsClient(udfso);
                throw throwable;
            }
        }
        return this.inodeController.getInodeAtPath(path.toString());
    }

    private Path getFilePath(Featurestore featurestore, String name, Integer version) throws URISyntaxException {
        return new Path(new URI(this.featurestoreFacade.getHiveDbHdfsPath(featurestore.getHiveDbId())).getPath(), name + "_" + version);
    }

    private FeaturestoreConnector getStorageConnector(Integer connectorId) throws FeaturestoreException {
        if (connectorId == null) {
            throw new IllegalArgumentException(RESTCodes.FeaturestoreErrorCode.CONNECTOR_ID_NOT_PROVIDED.getMessage());
        }
        return this.featurestoreConnectorFacade.findById(connectorId).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTOR_NOT_FOUND, Level.FINE, "Connector with id: " + connectorId + " was not found"));
    }
}

