package io.hops.hopsworks.common.featurestore.featuregroup.ondemand;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.featurestore.FeaturestoreFacade;
import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeatureGroupInputValidation;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreConnectorFacade;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.hdfs.inode.InodeController;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.activity.FeaturestoreActivityMeta;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeature;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeaturegroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandOption;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnectorType;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.hadoop.fs.Path;

@TransactionAttribute(TransactionAttributeType.NEVER)
@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupController.class */
public class OnDemandFeaturegroupController {

    @EJB
    private OnDemandFeaturegroupFacade onDemandFeaturegroupFacade;

    @EJB
    private FeaturestoreConnectorFacade featurestoreConnectorFacade;

    @EJB
    private FeaturestoreFacade featurestoreFacade;

    @EJB
    private DistributedFsService distributedFsService;

    @EJB
    private HdfsUsersController hdfsUsersController;

    @EJB
    private InodeController inodeController;

    @EJB
    private OnlineFeaturegroupController onlineFeatureGroupController;

    @EJB
    private FeatureGroupInputValidation featureGroupInputValidation;

    @EJB
    private FeaturegroupFacade featureGroupFacade;

    @EJB
    private FeaturestoreActivityFacade fsActivityFacade;

    public OnDemandFeaturegroup createOnDemandFeaturegroup(Featurestore featurestore, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO, Project project, Users users) throws FeaturestoreException {
        FeaturestoreConnector storageConnector = getStorageConnector(onDemandFeaturegroupDTO.getStorageConnector().getId());
        boolean z = storageConnector.getConnectorType() == FeaturestoreConnectorType.JDBC || storageConnector.getConnectorType() == FeaturestoreConnectorType.REDSHIFT || storageConnector.getConnectorType() == FeaturestoreConnectorType.SNOWFLAKE || storageConnector.getConnectorType() == FeaturestoreConnectorType.BIGQUERY;
        if (storageConnector.getConnectorType() == FeaturestoreConnectorType.KAFKA) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_ON_DEMAND_FEATUREGROUP, Level.FINE, storageConnector.getConnectorType() + " storage connectors are not supported as source for on demand feature groups");
        }
        if (Strings.isNullOrEmpty(onDemandFeaturegroupDTO.getQuery()) && z) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.INVALID_SQL_QUERY, Level.FINE, "SQL Query cannot be empty");
        }
        if (!Strings.isNullOrEmpty(onDemandFeaturegroupDTO.getQuery()) && !z) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.INVALID_SQL_QUERY, Level.FINE, "SQL query not supported when specifying " + storageConnector.getConnectorType() + " storage connectors");
        }
        if (onDemandFeaturegroupDTO.getDataFormat() == null && !z) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_ON_DEMAND_DATA_FORMAT, Level.FINE, "Data format required when specifying " + storageConnector.getConnectorType() + " storage connectors");
        }
        OnDemandFeaturegroup onDemandFeaturegroup = new OnDemandFeaturegroup();
        onDemandFeaturegroup.setDescription(onDemandFeaturegroupDTO.getDescription());
        onDemandFeaturegroup.setFeaturestoreConnector(storageConnector);
        onDemandFeaturegroup.setQuery(onDemandFeaturegroupDTO.getQuery());
        onDemandFeaturegroup.setFeatures(convertOnDemandFeatures(onDemandFeaturegroupDTO, onDemandFeaturegroup));
        onDemandFeaturegroup.setInode(createFile(project, users, featurestore, onDemandFeaturegroupDTO));
        onDemandFeaturegroup.setDataFormat(onDemandFeaturegroupDTO.getDataFormat());
        onDemandFeaturegroup.setPath(onDemandFeaturegroupDTO.getPath());
        if (onDemandFeaturegroupDTO.getOptions() != null) {
            onDemandFeaturegroup.setOptions((Collection) onDemandFeaturegroupDTO.getOptions().stream().map(onDemandOptionDTO -> {
                return new OnDemandOption(onDemandFeaturegroup, onDemandOptionDTO.getName(), onDemandOptionDTO.getValue());
            }).collect(Collectors.toList()));
        }
        this.onDemandFeaturegroupFacade.persist(onDemandFeaturegroup);
        return onDemandFeaturegroup;
    }

    public OnDemandFeaturegroupDTO convertOnDemandFeatureGroupToDTO(String str, Featuregroup featuregroup, FeaturestoreStorageConnectorDTO featurestoreStorageConnectorDTO) throws FeaturestoreException {
        return new OnDemandFeaturegroupDTO(str, featuregroup, featurestoreStorageConnectorDTO, getFeaturesDTO(featuregroup), this.onlineFeatureGroupController.onlineFeatureGroupTopicName(featuregroup.getFeaturestore().getProject().getId(), featuregroup.getId(), Utils.getFeaturegroupName(featuregroup)));
    }

    private List<FeatureGroupFeatureDTO> getFeaturesDTO(Featuregroup featuregroup) throws FeaturestoreException {
        return this.onlineFeatureGroupController.getFeaturegroupFeatures(featuregroup, (List) featuregroup.getOnDemandFeaturegroup().getFeatures().stream().sorted(Comparator.comparing((v0) -> {
            return v0.getIdx();
        })).map(onDemandFeature -> {
            return new FeatureGroupFeatureDTO(onDemandFeature.getName(), onDemandFeature.getType(), onDemandFeature.getDescription(), onDemandFeature.getPrimary(), onDemandFeature.getDefaultValue(), featuregroup.getId());
        }).collect(Collectors.toList()));
    }

    public void updateOnDemandFeaturegroupMetadata(Project project, Users users, Featuregroup featuregroup, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO) throws FeaturestoreException, SchemaException, SQLException, KafkaException {
        OnDemandFeaturegroup onDemandFeaturegroup = featuregroup.getOnDemandFeaturegroup();
        List<FeatureGroupFeatureDTO> featuresDTO = getFeaturesDTO(featuregroup);
        verifySchemaUnchangedAndValid(onDemandFeaturegroup.getFeatures(), onDemandFeaturegroupDTO.getFeatures());
        List<FeatureGroupFeatureDTO> verifyAndGetNewFeatures = this.featureGroupInputValidation.verifyAndGetNewFeatures(featuresDTO, onDemandFeaturegroupDTO.getFeatures());
        if (onDemandFeaturegroupDTO.getDescription() != null) {
            onDemandFeaturegroup.setDescription(onDemandFeaturegroupDTO.getDescription());
        }
        updateOnDemandFeatures(onDemandFeaturegroup, onDemandFeaturegroupDTO.getFeatures());
        if (!verifyAndGetNewFeatures.isEmpty()) {
            if (featuregroup.isOnlineEnabled()) {
                this.onlineFeatureGroupController.alterOnlineFeatureGroupSchema(featuregroup, verifyAndGetNewFeatures, onDemandFeaturegroupDTO.getFeatures(), project, users);
            }
            this.fsActivityFacade.logMetadataActivity(users, featuregroup, FeaturestoreActivityMeta.FG_ALTERED, "New features: " + ((String) verifyAndGetNewFeatures.stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.joining(","))));
        }
        this.onDemandFeaturegroupFacade.updateMetadata(onDemandFeaturegroup);
    }

    private void updateOnDemandFeatures(OnDemandFeaturegroup onDemandFeaturegroup, List<FeatureGroupFeatureDTO> list) {
        for (FeatureGroupFeatureDTO featureGroupFeatureDTO : list) {
            Optional<OnDemandFeature> onDemandFeature = getOnDemandFeature(onDemandFeaturegroup.getFeatures(), featureGroupFeatureDTO.getName());
            if (onDemandFeature.isPresent()) {
                onDemandFeature.get().setDescription(featureGroupFeatureDTO.getDescription());
            } else {
                onDemandFeaturegroup.getFeatures().add(new OnDemandFeature(onDemandFeaturegroup, featureGroupFeatureDTO.getName(), featureGroupFeatureDTO.getType(), featureGroupFeatureDTO.getDescription(), featureGroupFeatureDTO.getPrimary(), Integer.valueOf(onDemandFeaturegroup.getFeatures().size()), featureGroupFeatureDTO.getDefaultValue()));
            }
        }
    }

    private Optional<OnDemandFeature> getOnDemandFeature(Collection<OnDemandFeature> collection, String str) {
        return collection.stream().filter(onDemandFeature -> {
            return onDemandFeature.getName().equalsIgnoreCase(str);
        }).findAny();
    }

    public void verifySchemaUnchangedAndValid(Collection<OnDemandFeature> collection, List<FeatureGroupFeatureDTO> list) throws FeaturestoreException {
        for (OnDemandFeature onDemandFeature : collection) {
            FeatureGroupFeatureDTO orElseThrow = list.stream().filter(featureGroupFeatureDTO -> {
                return onDemandFeature.getName().equals(featureGroupFeatureDTO.getName());
            }).findAny().orElseThrow(() -> {
                return new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Feature " + onDemandFeature.getName() + " was not found in new schema. It is only possible to append features.");
            });
            if (orElseThrow.getPrimary() != onDemandFeature.getPrimary() || !orElseThrow.getType().equalsIgnoreCase(onDemandFeature.getType())) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Primary key or type information of feature " + onDemandFeature.getName() + " changed. Primary key and type cannot be changed when updating features.");
            }
            if (orElseThrow.getDefaultValue() != null) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Default values for features appended to external feature groups are not supported. Feature: " + onDemandFeature.getName() + ", provided default value: " + orElseThrow.getDefaultValue() + " must be null instead.");
            }
        }
    }

    public void removeOnDemandFeaturegroup(Featurestore featurestore, Featuregroup featuregroup, Project project, Users users) throws FeaturestoreException {
        String hdfsUserName = this.hdfsUsersController.getHdfsUserName(project, users);
        DistributedFileSystemOps distributedFileSystemOps = null;
        this.onDemandFeaturegroupFacade.remove(featuregroup.getOnDemandFeaturegroup());
        try {
            try {
                distributedFileSystemOps = this.distributedFsService.getDfsOps(hdfsUserName);
                distributedFileSystemOps.rm(getFilePath(featurestore, featuregroup.getName(), featuregroup.getVersion()), false);
                this.distributedFsService.closeDfsClient(distributedFileSystemOps);
            } catch (IOException | URISyntaxException e) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_DELETE_ON_DEMAND_FEATUREGROUP, Level.SEVERE, "Error removing the placeholder file", e.getMessage(), e);
            }
        } catch (Throwable th) {
            this.distributedFsService.closeDfsClient(distributedFileSystemOps);
            throw th;
        }
    }

    private List<OnDemandFeature> convertOnDemandFeatures(OnDemandFeaturegroupDTO onDemandFeaturegroupDTO, OnDemandFeaturegroup onDemandFeaturegroup) {
        if (onDemandFeaturegroupDTO.getFeatures().isEmpty()) {
            throw new IllegalArgumentException("No features were provided for on demand feature group");
        }
        int i = 0;
        ArrayList arrayList = new ArrayList();
        for (FeatureGroupFeatureDTO featureGroupFeatureDTO : onDemandFeaturegroupDTO.getFeatures()) {
            int i2 = i;
            i++;
            arrayList.add(new OnDemandFeature(onDemandFeaturegroup, featureGroupFeatureDTO.getName(), featureGroupFeatureDTO.getType(), featureGroupFeatureDTO.getDescription(), featureGroupFeatureDTO.getPrimary(), Integer.valueOf(i2), featureGroupFeatureDTO.getDefaultValue()));
        }
        return arrayList;
    }

    private Inode createFile(Project project, Users users, Featurestore featurestore, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO) throws FeaturestoreException {
        String hdfsUserName = this.hdfsUsersController.getHdfsUserName(project, users);
        DistributedFileSystemOps distributedFileSystemOps = null;
        try {
            try {
                Path filePath = getFilePath(featurestore, onDemandFeaturegroupDTO.getName(), onDemandFeaturegroupDTO.getVersion());
                distributedFileSystemOps = this.distributedFsService.getDfsOps(hdfsUserName);
                distributedFileSystemOps.touchz(filePath);
                this.distributedFsService.closeDfsClient(distributedFileSystemOps);
                return this.inodeController.getInodeAtPath(filePath.toString());
            } catch (IOException | URISyntaxException e) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_ON_DEMAND_FEATUREGROUP, Level.SEVERE, "Error creating the placeholder file", e.getMessage(), e);
            }
        } catch (Throwable th) {
            this.distributedFsService.closeDfsClient(distributedFileSystemOps);
            throw th;
        }
    }

    private Path getFilePath(Featurestore featurestore, String str, Integer num) throws URISyntaxException {
        return new Path(new URI(this.featurestoreFacade.getHiveDbHdfsPath(featurestore.getHiveDbId())).getPath(), str + "_" + num);
    }

    private FeaturestoreConnector getStorageConnector(Integer num) throws FeaturestoreException {
        if (num == null) {
            throw new IllegalArgumentException(RESTCodes.FeaturestoreErrorCode.CONNECTOR_ID_NOT_PROVIDED.getMessage());
        }
        return this.featurestoreConnectorFacade.findById(num).orElseThrow(() -> {
            return new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTOR_NOT_FOUND, Level.FINE, "Connector with id: " + num + " was not found");
        });
    }

    public void enableFeatureGroupOnline(Featurestore featurestore, Featuregroup featuregroup, Project project, Users users) throws FeaturestoreException, SQLException, ServiceException, KafkaException, SchemaException, ProjectException, UserException, IOException, HopsSecurityException {
        List<FeatureGroupFeatureDTO> featuresDTO = getFeaturesDTO(featuregroup);
        if (!featuregroup.isOnlineEnabled()) {
            this.onlineFeatureGroupController.setupOnlineFeatureGroup(featurestore, featuregroup, featuresDTO, project, users);
        }
        featuregroup.setOnlineEnabled(true);
        this.featureGroupFacade.updateFeaturegroupMetadata(featuregroup);
    }
}
