/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.featuregroup.cached;

import com.google.common.base.Strings;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.feature.FeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.OfflineFeatureGroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
import io.hops.hopsworks.common.hive.HiveController;
import io.hops.hopsworks.common.security.CertificateMaterializer;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.CryptoPasswordNotFoundException;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.CachedFeaturegroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.HiveColumns;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.HiveTableParams;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.HiveTbls;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.Storage;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.commons.lang3.StringUtils;
import org.javatuples.Pair;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class CachedFeaturegroupController {
    @EJB
    private CachedFeaturegroupFacade cachedFeaturegroupFacade;
    @EJB
    private CertificateMaterializer certificateMaterializer;
    @EJB
    private Settings settings;
    @EJB
    private FeaturestoreController featurestoreController;
    @EJB
    private OnlineFeaturegroupController onlineFeaturegroupController;
    @EJB
    private OnlineFeaturestoreController onlineFeaturestoreController;
    @EJB
    private OfflineFeatureGroupController offlineFeatureGroupController;
    @EJB
    private HiveController hiveController;
    private static final Logger LOGGER = Logger.getLogger(CachedFeaturegroupController.class.getName());
    private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";

    @PostConstruct
    public void init() {
        try {
            Class.forName(HIVE_DRIVER);
        }
        catch (ClassNotFoundException e) {
            LOGGER.log(Level.SEVERE, "Could not load the Hive driver: org.apache.hive.jdbc.HiveDriver", e);
        }
    }

    private Connection initConnection(String databaseName, Project project, Users user) throws FeaturestoreException {
        try {
            String hiveEndpoint = this.hiveController.getHiveServerInternalEndpoint();
            this.certificateMaterializer.materializeCertificatesLocal(user.getUsername(), project.getName());
            String password = String.copyValueOf(this.certificateMaterializer.getUserMaterial(user.getUsername(), project.getName()).getPassword());
            String jdbcString = "jdbc:hive2://" + hiveEndpoint + "/" + databaseName + ";auth=noSasl;ssl=true;twoWay=true;sslTrustStore=" + this.certificateMaterializer.getUserTransientTruststorePath(project, user) + ";trustStorePassword=" + password + ";sslKeyStore=" + this.certificateMaterializer.getUserTransientKeystorePath(project, user) + ";keyStorePassword=" + password;
            return DriverManager.getConnection(jdbcString);
        }
        catch (ServiceDiscoveryException | CryptoPasswordNotFoundException | FileNotFoundException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CERTIFICATES_NOT_FOUND, Level.SEVERE, "project: " + project.getName() + ", hive database: " + databaseName, e.getMessage(), e);
        }
        catch (IOException | SQLException e) {
            this.certificateMaterializer.removeCertificatesLocal(user.getUsername(), project.getName());
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_HIVE_CONNECTION, Level.SEVERE, "project: " + project.getName() + ", hive database: " + databaseName, e.getMessage(), (Throwable)e);
        }
    }

    public String getDDLSchema(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, HopsSecurityException {
        try {
            return this.parseSqlSchemaResult(this.getSQLSchemaForFeaturegroup(featuregroup, project, user));
        }
        catch (SQLException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_FETCH_FEATUREGROUP_SHOW_CREATE_SCHEMA, Level.SEVERE, "Internal error fetching the schema of the feature group", e.getMessage(), (Throwable)e);
        }
    }

    private String parseSqlSchemaResult(FeaturegroupPreview preview) {
        return StringUtils.join((Iterable)preview.getPreview().stream().map(row -> (String)row.getValues().get(0).getValue1()).collect(Collectors.toList()), (String)"\n");
    }

    private String getTblName(String featuregroupName, Integer version) {
        return featuregroupName + "_" + version.toString();
    }

    public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Project project, Users user, String partition, boolean online, int limit) throws SQLException, FeaturestoreException, HopsSecurityException {
        if (online && featuregroup.getCachedFeaturegroup().isOnlineEnabled()) {
            return this.onlineFeaturegroupController.getFeaturegroupPreview(featuregroup, project, user, limit);
        }
        if (online) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATUREGROUP_NOT_ONLINE, Level.FINE);
        }
        return this.getOfflineFeaturegroupPreview(featuregroup, project, user, partition, limit);
    }

    public FeaturegroupPreview getOfflineFeaturegroupPreview(Featuregroup featuregroup, Project project, Users user, String partition, int limit) throws FeaturestoreException, HopsSecurityException, SQLException {
        String where = this.getWhereCondition(partition);
        String tbl = this.getTblName(featuregroup.getName(), featuregroup.getVersion());
        String query = "SELECT * FROM " + tbl + " " + where + " LIMIT " + limit;
        String db = this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
        try {
            return this.executeReadHiveQuery(query, db, project, user);
        }
        catch (Exception e) {
            return this.executeReadHiveQuery(query, db, project, user);
        }
    }

    public String getWhereCondition(String partition) {
        if (Strings.isNullOrEmpty((String)partition)) {
            return "";
        }
        String[] splits = partition.split("/");
        ArrayList<String> escapedSplits = new ArrayList<String>();
        for (String split : splits) {
            escapedSplits.add(split.replaceFirst("=", "='") + "'");
        }
        return "WHERE " + StringUtils.join(escapedSplits, (String)" AND ");
    }

    public CachedFeaturegroup createCachedFeaturegroup(Featurestore featurestore, CachedFeaturegroupDTO cachedFeaturegroupDTO, Project project, Users user) throws FeaturestoreException, ServiceException, IOException, SQLException {
        List primaryKeys = cachedFeaturegroupDTO.getFeatures().stream().filter(FeatureDTO::getPrimary).collect(Collectors.toList());
        for (FeatureDTO primaryKey : primaryKeys) {
            if (!primaryKey.getPartition().booleanValue()) continue;
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_FEATUREGROUP, Level.FINE, "The primary key column: " + primaryKey.getName() + " was specified as a partition column, which is not allowed. Primary key columns can not be partitioned; Ignoring this partition request.");
        }
        String tableName = this.getTblName(cachedFeaturegroupDTO.getName(), cachedFeaturegroupDTO.getVersion());
        this.offlineFeatureGroupController.createHiveTable(featurestore, tableName, cachedFeaturegroupDTO.getDescription(), cachedFeaturegroupDTO.getFeatures(), project, user);
        boolean onlineEnabled = false;
        if (this.settings.isOnlineFeaturestore().booleanValue() && cachedFeaturegroupDTO.getOnlineEnabled().booleanValue()) {
            this.onlineFeaturegroupController.createMySQLTable(featurestore, tableName, cachedFeaturegroupDTO.getFeatures(), project, user);
            onlineEnabled = true;
        }
        HiveTbls hiveTbls = this.cachedFeaturegroupFacade.getHiveTableByNameAndDB(tableName, featurestore.getHiveDbId()).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_FEATUREGROUP, Level.WARNING, "", "Table created correctly but not in the metastore"));
        return this.persistCachedFeaturegroupMetadata(hiveTbls, onlineEnabled, cachedFeaturegroupDTO.getDefaultStorage());
    }

    public CachedFeaturegroupDTO convertCachedFeaturegroupToDTO(Featuregroup featuregroup) {
        CachedFeaturegroupDTO cachedFeaturegroupDTO = new CachedFeaturegroupDTO(featuregroup);
        HiveTbls hiveTable = featuregroup.getCachedFeaturegroup().getHiveTbls();
        List<FeatureDTO> featureDTOS = this.getFeaturesDTO(hiveTable);
        if (this.settings.isOnlineFeaturestore().booleanValue() && featuregroup.getCachedFeaturegroup().isOnlineEnabled()) {
            cachedFeaturegroupDTO.setOnlineEnabled(true);
            List<FeatureDTO> onlineFeatureDTOs = this.onlineFeaturegroupController.getFeaturegroupFeatures(featuregroup);
            for (FeatureDTO featureDTO : featureDTOS) {
                for (FeatureDTO onlineFeatureDTO : onlineFeatureDTOs) {
                    if (!featureDTO.getName().equalsIgnoreCase(onlineFeatureDTO.getName())) continue;
                    featureDTO.setOnlineType(onlineFeatureDTO.getType());
                }
            }
        }
        cachedFeaturegroupDTO.setFeatures(featureDTOS);
        cachedFeaturegroupDTO.setName(featuregroup.getName());
        cachedFeaturegroupDTO.setDefaultStorage(featuregroup.getCachedFeaturegroup().getDefaultStorage());
        cachedFeaturegroupDTO.setHudiEnabled(featuregroup.getCachedFeaturegroup().getHiveTbls().getSdId().getInputFormat().equals(OfflineFeatureGroupController.Formats.HUDI.getInputFormat()));
        cachedFeaturegroupDTO.setDescription(hiveTable.getHiveTableParamsCollection().stream().filter(p -> p.getHiveTableParamsPK().getParamKey().equalsIgnoreCase("COMMENT")).map(HiveTableParams::getParamValue).findFirst().orElse(""));
        cachedFeaturegroupDTO.setLocation(hiveTable.getSdId().getLocation());
        return cachedFeaturegroupDTO;
    }

    public List<FeatureDTO> getFeaturesDTO(HiveTbls hiveTable) {
        List primaryKeys = hiveTable.getHiveKeyConstraintsCollection().stream().filter(c -> c.getConstraintType() == 0).collect(Collectors.toList());
        ArrayList<FeatureDTO> featureDTOS = new ArrayList<FeatureDTO>();
        for (HiveColumns hc : hiveTable.getSdId().getCdId().getHiveColumnsCollection()) {
            boolean primary = primaryKeys.stream().anyMatch(pk -> pk.getParentCdId().getCdId().equals(hc.getHiveColumnsPK().getCdId()) && pk.getParentIntegerIdx() == hc.getIntegerIdx());
            featureDTOS.add(new FeatureDTO(hc.getHiveColumnsPK().getColumnName(), hc.getTypeName(), hc.getComment(), primary));
        }
        featureDTOS.addAll(hiveTable.getHivePartitionKeysCollection().stream().map(pk -> new FeatureDTO(pk.getHivePartitionKeysPK().getPkeyName(), pk.getPkeyType(), pk.getPkeyType(), false, (Boolean)true)).collect(Collectors.toList()));
        return featureDTOS;
    }

    private FeaturegroupPreview getSQLSchemaForFeaturegroup(Featuregroup featuregroup, Project project, Users user) throws SQLException, FeaturestoreException, HopsSecurityException {
        String tbl = this.getTblName(featuregroup.getName(), featuregroup.getVersion());
        String query = "SHOW CREATE TABLE " + tbl;
        String db = this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
        return this.executeReadHiveQuery(query, db, project, user);
    }

    public void dropHiveFeaturegroup(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, IOException, ServiceException {
        String db = this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
        String tableName = this.getTblName(featuregroup.getName(), featuregroup.getVersion());
        this.offlineFeatureGroupController.dropFeatureGroup(db, tableName, project, user);
    }

    public void dropMySQLFeaturegroup(Featuregroup featuregroup, Project project, Users user) throws SQLException, FeaturestoreException {
        if (this.settings.isOnlineFeaturestore().booleanValue() && featuregroup.getCachedFeaturegroup().isOnlineEnabled()) {
            this.onlineFeaturegroupController.dropMySQLTable(featuregroup, project, user);
        }
    }

    public FeaturegroupPreview parseResultset(ResultSet rs) throws SQLException {
        ResultSetMetaData rsmd = rs.getMetaData();
        FeaturegroupPreview featuregroupPreview = new FeaturegroupPreview();
        while (rs.next()) {
            FeaturegroupPreview.Row row = new FeaturegroupPreview.Row();
            for (int i = 1; i <= rsmd.getColumnCount(); ++i) {
                Object columnValue = rs.getObject(i);
                row.addValue((Pair<String, String>)new Pair((Object)this.parseColumnLabel(rsmd.getColumnLabel(i)), (Object)(columnValue == null ? null : columnValue.toString())));
            }
            featuregroupPreview.addRow(row);
        }
        return featuregroupPreview;
    }

    private String parseColumnLabel(String columnLabel) {
        if (columnLabel.contains(".")) {
            return columnLabel.split("\\.")[1];
        }
        return columnLabel;
    }

    private FeaturegroupPreview executeReadHiveQuery(String query, String databaseName, Project project, Users user) throws SQLException, FeaturestoreException, HopsSecurityException {
        Connection conn = null;
        Statement stmt = null;
        try {
            conn = this.initConnection(databaseName, project, user);
            stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery(query);
            FeaturegroupPreview featuregroupPreview = this.parseResultset(rs);
            return featuregroupPreview;
        }
        catch (SQLException e) {
            if (e.getMessage().toLowerCase().contains("permission denied")) {
                throw new HopsSecurityException(RESTCodes.SecurityErrorCode.HDFS_ACCESS_CONTROL, Level.FINE, "project: " + project.getName() + ", hive database: " + databaseName + " hive query: " + query, e.getMessage(), (Throwable)e);
            }
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.HIVE_READ_QUERY_ERROR, Level.SEVERE, "project: " + project.getName() + ", hive database: " + databaseName + " hive query: " + query, e.getMessage(), (Throwable)e);
        }
        finally {
            if (stmt != null) {
                stmt.close();
            }
            this.closeConnection(conn, user, project);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeConnection(Connection conn, Users user, Project project) {
        try {
            if (conn != null) {
                conn.close();
            }
        }
        catch (SQLException e) {
            LOGGER.log(Level.WARNING, "Error closing Hive JDBC connection: " + e);
        }
        finally {
            this.certificateMaterializer.removeCertificatesLocal(user.getUsername(), project.getName());
        }
    }

    public CachedFeaturegroup syncHiveTableWithFeaturestore(Featurestore featurestore, CachedFeaturegroupDTO cachedFeaturegroupDTO) throws FeaturestoreException {
        String tableName = this.getTblName(cachedFeaturegroupDTO.getName(), cachedFeaturegroupDTO.getVersion());
        HiveTbls hiveTbls = this.cachedFeaturegroupFacade.getHiveTableByNameAndDB(tableName, featurestore.getHiveDbId()).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.SYNC_TABLE_NOT_FOUND, Level.FINE, ", tried to sync hive table with name: " + tableName + " with the feature store, but the table was not found in the Hive metastore"));
        return this.persistCachedFeaturegroupMetadata(hiveTbls, false, Storage.OFFLINE);
    }

    private CachedFeaturegroup persistCachedFeaturegroupMetadata(HiveTbls hiveTable, boolean onlineEnabled, Storage defaultStorage) {
        CachedFeaturegroup cachedFeaturegroup = new CachedFeaturegroup();
        cachedFeaturegroup.setHiveTbls(hiveTable);
        cachedFeaturegroup.setOnlineEnabled(onlineEnabled);
        cachedFeaturegroup.setDefaultStorage(defaultStorage);
        this.cachedFeaturegroupFacade.persist(cachedFeaturegroup);
        return cachedFeaturegroup;
    }

    public FeaturegroupDTO enableFeaturegroupOnline(Featurestore featurestore, Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, SQLException {
        if (!this.settings.isOnlineFeaturestore().booleanValue()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, Level.FINE, "Online Featurestore is not enabled for this Hopsworks cluster.");
        }
        if (!this.onlineFeaturestoreController.checkIfDatabaseExists(this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject())).booleanValue()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, Level.FINE, "Online Featurestore is not enabled for this project. To enable online feature store, talk to an administrator.");
        }
        CachedFeaturegroup cachedFeaturegroup = featuregroup.getCachedFeaturegroup();
        String tableName = this.getTblName(featuregroup.getName(), featuregroup.getVersion());
        List<FeatureDTO> features = this.getFeaturesDTO(cachedFeaturegroup.getHiveTbls());
        if (!cachedFeaturegroup.isOnlineEnabled()) {
            this.onlineFeaturegroupController.createMySQLTable(featurestore, tableName, features, project, user);
        }
        cachedFeaturegroup.setOnlineEnabled(true);
        this.cachedFeaturegroupFacade.updateMetadata(cachedFeaturegroup);
        return this.convertCachedFeaturegroupToDTO(featuregroup);
    }

    public FeaturegroupDTO disableFeaturegroupOnline(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, SQLException {
        if (!this.settings.isOnlineFeaturestore().booleanValue()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, Level.FINE, "Online Featurestore is not enabled for this Hopsworks cluster.");
        }
        if (!this.onlineFeaturestoreController.checkIfDatabaseExists(this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject())).booleanValue()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, Level.FINE, "Online Featurestore is not enabled for this project. To enable online feature store, talk to an administrator.");
        }
        CachedFeaturegroup cachedFeaturegroup = featuregroup.getCachedFeaturegroup();
        if (cachedFeaturegroup.isOnlineEnabled()) {
            this.dropMySQLFeaturegroup(featuregroup, project, user);
            cachedFeaturegroup.setOnlineEnabled(false);
            this.cachedFeaturegroupFacade.persist(cachedFeaturegroup);
        }
        return this.convertCachedFeaturegroupToDTO(featuregroup);
    }
}

