/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.engine;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.logicalclocks.hsfs.FeatureStoreBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.FeatureViewBase;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TrainingDatasetFeature;
import com.logicalclocks.hsfs.constructor.PreparedStatementParameter;
import com.logicalclocks.hsfs.constructor.ServingPreparedStatement;
import com.logicalclocks.hsfs.metadata.FeatureViewApi;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.HopsworksExternalClient;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import com.logicalclocks.hsfs.metadata.Variable;
import com.logicalclocks.hsfs.metadata.VariablesApi;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.text.WordUtils;

public class VectorServer {
    private HikariDataSource hikariDataSource = null;
    private Map<Integer, TreeMap<String, Integer>> preparedStatementParameters;
    private TreeMap<Integer, ServingPreparedStatement> orderedServingPreparedStatements;
    private HashSet<String> servingKeys;
    private StorageConnectorApi storageConnectorApi = new StorageConnectorApi();
    private FeatureViewApi featureViewApi = new FeatureViewApi();
    private Map<Integer, Map<String, DatumReader<Object>>> featureGroupDatumReaders;
    private Map<Integer, Set<String>> featureGroupFeatures;
    private ExecutorService executorService = Executors.newCachedThreadPool();
    private boolean isBatch = false;
    private VariablesApi variablesApi = new VariablesApi();

    public List<Object> getFeatureVector(FeatureViewBase featureViewBase, Map<String, Object> entry) throws FeatureStoreException, IOException, ClassNotFoundException {
        return this.getFeatureVector(featureViewBase, entry, HopsworksClient.getInstance().getHopsworksHttpClient() instanceof HopsworksExternalClient);
    }

    public List<Object> getFeatureVector(FeatureViewBase featureViewBase, Map<String, Object> entry, boolean external) throws FeatureStoreException, IOException, ClassNotFoundException {
        if (this.hikariDataSource == null || this.isBatch) {
            this.initPreparedStatement(featureViewBase, false, external);
        }
        this.checkPrimaryKeys(entry.keySet());
        return this.getFeatureVector(entry);
    }

    @VisibleForTesting
    public List<Object> getFeatureVector(Map<String, Object> entry) throws FeatureStoreException {
        ArrayList<Object> servingVector = new ArrayList<Object>();
        ArrayList<Future<List>> queryFutures = new ArrayList<Future<List>>();
        for (Integer n : this.orderedServingPreparedStatements.keySet()) {
            queryFutures.add(this.executorService.submit(() -> {
                try {
                    return this.processQuery(entry, preparedStatementIndex);
                }
                catch (FeatureStoreException | IOException | SQLException e) {
                    throw new RuntimeException(e);
                }
            }));
        }
        for (Future future : queryFutures) {
            try {
                servingVector.addAll((Collection)future.get());
            }
            catch (InterruptedException | ExecutionException e) {
                throw new FeatureStoreException("Error retrieving query statement result", e);
            }
        }
        return servingVector;
    }

    public <T> T getFeatureVectorObject(FeatureViewBase featureViewBase, Map<String, Object> entry, boolean external, Class<T> returnType) throws FeatureStoreException, IOException, ClassNotFoundException, IllegalAccessException, InstantiationException {
        if (this.hikariDataSource == null || this.isBatch) {
            this.initPreparedStatement(featureViewBase, false, external);
        }
        this.checkPrimaryKeys(entry.keySet());
        return this.getFeatureVectorObject(entry, returnType);
    }

    public <T> T getFeatureVectorObject(Map<String, Object> entry, Class<T> returnType) throws FeatureStoreException, InstantiationException, IllegalAccessException {
        ArrayList queryFutures = new ArrayList();
        T returnObject = returnType.newInstance();
        for (Integer n : this.orderedServingPreparedStatements.keySet()) {
            queryFutures.add(this.executorService.submit(() -> {
                try {
                    this.processQuery(entry, preparedStatementIndex, returnObject);
                }
                catch (FeatureStoreException | IOException | IllegalAccessException | NoSuchMethodException | InvocationTargetException | SQLException e) {
                    throw new RuntimeException(e);
                }
            }));
        }
        for (Future future : queryFutures) {
            try {
                future.get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new FeatureStoreException("Error retrieving query statement result", e);
            }
        }
        return returnObject;
    }

    public <T> void processQuery(Map<String, Object> entry, int preparedStatementIndex, T returnObject) throws SQLException, FeatureStoreException, IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        try (Connection connection = this.hikariDataSource.getConnection();){
            ServingPreparedStatement servingPreparedStatement = this.orderedServingPreparedStatements.get(preparedStatementIndex);
            PreparedStatement preparedStatement = connection.prepareStatement(servingPreparedStatement.getQueryOnline());
            Map parameterIndexInStatement = this.preparedStatementParameters.get(preparedStatementIndex);
            for (String name : entry.keySet()) {
                if (!parameterIndexInStatement.containsKey(name)) continue;
                preparedStatement.setObject((Integer)parameterIndexInStatement.get(name), entry.get(name));
            }
            ResultSet results = preparedStatement.executeQuery();
            if (!results.isBeforeFirst()) {
                throw new FeatureStoreException("No data was retrieved from online feature store.");
            }
            int columnCount = results.getMetaData().getColumnCount();
            Map<String, DatumReader<Object>> featuresDatumReaders = this.featureGroupDatumReaders.get(servingPreparedStatement.getFeatureGroupId());
            while (results.next()) {
                for (int index = 1; index <= columnCount; ++index) {
                    String methodName = "set" + WordUtils.capitalize((String)results.getMetaData().getColumnLabel(index));
                    Object columnValue = featuresDatumReaders != null && featuresDatumReaders.containsKey(results.getMetaData().getColumnLabel(index)) ? this.deserializeComplexFeature(featuresDatumReaders.get(results.getMetaData().getColumnLabel(index)), results, index) : results.getObject(index);
                    Method setter = returnObject.getClass().getDeclaredMethod(methodName, columnValue.getClass());
                    setter.invoke(returnObject, columnValue);
                }
            }
            results.close();
        }
    }

    private List<Object> processQuery(Map<String, Object> entry, int preparedStatementIndex) throws SQLException, FeatureStoreException, IOException {
        ArrayList<Object> servingVector = new ArrayList<Object>();
        try (Connection connection = this.hikariDataSource.getConnection();){
            ServingPreparedStatement servingPreparedStatement = this.orderedServingPreparedStatements.get(preparedStatementIndex);
            PreparedStatement preparedStatement = connection.prepareStatement(servingPreparedStatement.getQueryOnline());
            Map parameterIndexInStatement = this.preparedStatementParameters.get(preparedStatementIndex);
            for (String name : entry.keySet()) {
                if (!parameterIndexInStatement.containsKey(name)) continue;
                preparedStatement.setObject((Integer)parameterIndexInStatement.get(name), entry.get(name));
            }
            ResultSet results = preparedStatement.executeQuery();
            if (!results.isBeforeFirst()) {
                throw new FeatureStoreException("No data was retrieved from online feature store.");
            }
            int columnCount = results.getMetaData().getColumnCount();
            Map<String, DatumReader<Object>> featuresDatumReaders = this.featureGroupDatumReaders.get(servingPreparedStatement.getFeatureGroupId());
            while (results.next()) {
                for (int index = 1; index <= columnCount; ++index) {
                    if (featuresDatumReaders != null && featuresDatumReaders.containsKey(results.getMetaData().getColumnLabel(index))) {
                        servingVector.add(this.deserializeComplexFeature(featuresDatumReaders.get(results.getMetaData().getColumnLabel(index)), results, index));
                        continue;
                    }
                    servingVector.add(results.getObject(index));
                }
            }
            results.close();
        }
        return servingVector;
    }

    public List<List<Object>> getFeatureVectors(FeatureViewBase featureViewBase, Map<String, List<Object>> entry, boolean external) throws SQLException, FeatureStoreException, IOException, ClassNotFoundException {
        if (this.hikariDataSource == null || !this.isBatch) {
            this.initPreparedStatement(featureViewBase, true, external);
        }
        return this.getFeatureVectors(entry);
    }

    public List<List<Object>> getFeatureVectors(Map<String, List<Object>> entry) throws SQLException, FeatureStoreException, IOException {
        this.checkPrimaryKeys(entry.keySet());
        ArrayList queries = Lists.newArrayList();
        for (Integer fgId : this.orderedServingPreparedStatements.keySet()) {
            String query = this.orderedServingPreparedStatements.get(fgId).getQueryOnline();
            String zippedTupleString = this.zipArraysToTupleString(this.preparedStatementParameters.get(fgId).entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).map(e -> (List)entry.get(e.getKey())).collect(Collectors.toList()));
            queries.add(query.replaceFirst("\\?", zippedTupleString));
        }
        return this.getFeatureVectors(queries, entry);
    }

    private List<List<Object>> getFeatureVectors(List<String> queries, Map<String, List<Object>> entry) throws SQLException, FeatureStoreException, IOException {
        ArrayList<Object> servingVector = new ArrayList<Object>();
        int batchSize = entry.values().stream().findAny().get().size();
        ArrayList<List<Object>> servingVectorArray = new ArrayList<List<Object>>(batchSize);
        for (int i = 0; i < batchSize; ++i) {
            servingVectorArray.add(new ArrayList());
        }
        try (Connection connection = this.hikariDataSource.getConnection();
             Statement stmt = connection.createStatement();){
            int statementOrder = 0;
            for (String query : queries) {
                try (ResultSet results = stmt.executeQuery(query);){
                    if (!results.isBeforeFirst()) {
                        throw new FeatureStoreException("No data was retrieved from online feature store.");
                    }
                    int columnCount = results.getMetaData().getColumnCount();
                    ServingPreparedStatement servingPreparedStatement = this.orderedServingPreparedStatements.get(statementOrder);
                    Map<String, DatumReader<Object>> featuresDatumReaders = this.featureGroupDatumReaders.get(servingPreparedStatement.getFeatureGroupId());
                    Set<String> selectedFeatureNames = this.featureGroupFeatures.get(servingPreparedStatement.getFeatureGroupId());
                    while (results.next()) {
                        int index = 1;
                        while (index <= columnCount) {
                            if (!selectedFeatureNames.contains(results.getMetaData().getColumnLabel(index))) {
                                ++index;
                                continue;
                            }
                            if (featuresDatumReaders != null && featuresDatumReaders.containsKey(results.getMetaData().getColumnLabel(index))) {
                                servingVector.add(this.deserializeComplexFeature(featuresDatumReaders.get(results.getMetaData().getColumnLabel(index)), results, index));
                            } else {
                                servingVector.add(results.getObject(index));
                            }
                            ++index;
                        }
                        List<Integer> orderInBatch = this.getOrderInBatch(results, entry, statementOrder, batchSize);
                        for (Integer order : orderInBatch) {
                            ((List)servingVectorArray.get(order)).addAll(servingVector);
                        }
                        servingVector = new ArrayList();
                    }
                }
                ++statementOrder;
            }
        }
        return servingVectorArray;
    }

    public void initServing(FeatureViewBase featureViewBase, boolean batch) throws FeatureStoreException, IOException, ClassNotFoundException {
        this.initPreparedStatement(featureViewBase, batch);
    }

    public void initServing(FeatureViewBase featureViewBase, boolean batch, boolean external) throws FeatureStoreException, IOException, SQLException, ClassNotFoundException {
        this.initPreparedStatement(featureViewBase, batch, external);
    }

    public void initPreparedStatement(FeatureViewBase featureViewBase, boolean batch) throws FeatureStoreException, IOException, ClassNotFoundException {
        this.initPreparedStatement(featureViewBase, batch, HopsworksClient.getInstance().getHopsworksHttpClient() instanceof HopsworksExternalClient);
    }

    public void initPreparedStatement(FeatureViewBase featureViewBase, boolean batch, boolean external) throws FeatureStoreException, IOException, ClassNotFoundException {
        List<ServingPreparedStatement> servingPreparedStatements = this.featureViewApi.getServingPreparedStatement(featureViewBase, batch);
        this.initPreparedStatement((FeatureStoreBase)featureViewBase.getFeatureStore(), featureViewBase.getFeatures(), servingPreparedStatements, batch, external);
    }

    @VisibleForTesting
    public void initPreparedStatement(FeatureStoreBase featureStoreBase, List<TrainingDatasetFeature> features, List<ServingPreparedStatement> servingPreparedStatements, boolean batch, boolean external) throws FeatureStoreException, IOException, ClassNotFoundException {
        this.isBatch = batch;
        this.setupHikariPool(featureStoreBase, external);
        HashMap<Integer, TreeMap<String, Integer>> preparedStatementParameters = new HashMap<Integer, TreeMap<String, Integer>>();
        TreeMap<Integer, ServingPreparedStatement> orderedServingPreparedStatements = new TreeMap<Integer, ServingPreparedStatement>();
        HashSet servingVectorKeys = new HashSet();
        HashMap<Integer, String> prefixMap = new HashMap<Integer, String>();
        for (ServingPreparedStatement servingPreparedStatement : servingPreparedStatements) {
            prefixMap.put(servingPreparedStatement.getFeatureGroupId(), servingPreparedStatement.getPrefix());
            orderedServingPreparedStatements.put(servingPreparedStatement.getPreparedStatementIndex(), servingPreparedStatement);
            TreeMap parameterIndices = new TreeMap();
            servingPreparedStatement.getPreparedStatementParameters().forEach(preparedStatementParameter -> {
                servingVectorKeys.add(preparedStatementParameter.getName());
                parameterIndices.put(preparedStatementParameter.getName(), preparedStatementParameter.getIndex());
            });
            preparedStatementParameters.put(servingPreparedStatement.getPreparedStatementIndex(), parameterIndices);
        }
        this.servingKeys = servingVectorKeys;
        this.preparedStatementParameters = preparedStatementParameters;
        this.orderedServingPreparedStatements = orderedServingPreparedStatements;
        this.featureGroupDatumReaders = this.getComplexFeatureSchemas(features, prefixMap);
        this.featureGroupFeatures = features.stream().collect(Collectors.groupingBy(feature -> feature.getFeaturegroup().getId(), Collectors.mapping(TrainingDatasetFeature::getName, Collectors.toSet())));
    }

    @VisibleForTesting
    public void setupHikariPool(FeatureStoreBase featureStoreBase, Boolean external) throws FeatureStoreException, IOException, ClassNotFoundException {
        Class.forName("com.mysql.cj.jdbc.Driver");
        StorageConnector.JdbcConnector storageConnectorBase = this.storageConnectorApi.getOnlineStorageConnector(featureStoreBase, StorageConnector.JdbcConnector.class);
        Map<String, String> jdbcOptions = storageConnectorBase.sparkOptions();
        String url = jdbcOptions.get("url");
        if (external.booleanValue()) {
            Optional<Variable> loadbalancerVariable = this.variablesApi.get("loadbalancer_external_domain_mysqld");
            if (!loadbalancerVariable.isPresent() || Strings.isNullOrEmpty((String)loadbalancerVariable.get().getValue())) {
                throw new FeatureStoreException("No external domain for MySQL was found in the Hopsworks Cluster Configuration variables. Contact your administrator.");
            }
            String host = loadbalancerVariable.get().getValue();
            url = url.replaceAll("/[0-9.]+:", "/" + host + ":");
        }
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl(url);
        config.setUsername(jdbcOptions.get("user"));
        config.setPassword(jdbcOptions.get("password"));
        this.hikariDataSource = new HikariDataSource(config);
    }

    @VisibleForTesting
    public String zipArraysToTupleString(List<List<Object>> lists) {
        ArrayList<String> zippedTuples = new ArrayList<String>();
        for (int i = 0; i < lists.get(0).size(); ++i) {
            ArrayList<String> zippedArray = new ArrayList<String>();
            for (List<Object> in : lists) {
                if (in.get(i) instanceof String) {
                    zippedArray.add("'" + in.get(i).toString() + "'");
                    continue;
                }
                zippedArray.add(in.get(i).toString());
            }
            zippedTuples.add("(" + String.join((CharSequence)",", zippedArray) + ")");
        }
        return "(" + String.join((CharSequence)",", zippedTuples) + ")";
    }

    private Object deserializeComplexFeature(DatumReader<Object> featureDatumReader, ResultSet results, int index) throws SQLException, IOException {
        if (results.getBytes(index) != null) {
            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(results.getBytes(index), null);
            return featureDatumReader.read(null, (Decoder)decoder);
        }
        return null;
    }

    private List<Integer> getOrderInBatch(ResultSet results, Map<String, List<Object>> entry, int statementOrder, int batchSize) throws SQLException {
        ArrayList<Integer> orderInBatch = new ArrayList<Integer>();
        ServingPreparedStatement preparedStatement = this.orderedServingPreparedStatements.get(statementOrder);
        for (int i = 0; i < batchSize; ++i) {
            boolean correctIndex = false;
            for (PreparedStatementParameter preparedStatementParameter : preparedStatement.getPreparedStatementParameters()) {
                String columnName;
                List<Object> entryForParameter = entry.get(preparedStatementParameter.getName());
                Class<?> expectedResultClass = entryForParameter.get(i).getClass();
                String string = columnName = Strings.isNullOrEmpty((String)preparedStatement.getPrefix()) ? preparedStatementParameter.getName() : preparedStatement.getPrefix() + preparedStatementParameter.getName();
                if (results.getObject(columnName, expectedResultClass).equals(entryForParameter.get(i))) {
                    correctIndex = true;
                    continue;
                }
                correctIndex = false;
                break;
            }
            if (!correctIndex) continue;
            orderInBatch.add(i);
        }
        return orderInBatch;
    }

    @VisibleForTesting
    public Map<Integer, Map<String, DatumReader<Object>>> getComplexFeatureSchemas(List<TrainingDatasetFeature> features, Map<Integer, String> prefixMap) throws FeatureStoreException, IOException {
        HashMap<Integer, Map<String, DatumReader<Object>>> featureSchemaMap = new HashMap<Integer, Map<String, DatumReader<Object>>>();
        for (TrainingDatasetFeature f : features) {
            if (!f.isComplex()) continue;
            HashMap<String, GenericDatumReader> featureGroupMap = (HashMap<String, GenericDatumReader>)featureSchemaMap.get(f.getFeaturegroup().getId());
            if (featureGroupMap == null) {
                featureGroupMap = new HashMap<String, GenericDatumReader>();
            }
            String featureName = f.getName();
            String prefix = prefixMap.get(f.getFeaturegroup().getId());
            if (!Strings.isNullOrEmpty((String)prefix) && featureName.startsWith(prefix)) {
                featureName = featureName.substring(prefix.length());
            }
            GenericDatumReader datumReader = new GenericDatumReader(new Schema.Parser().parse(f.getFeaturegroup().getFeatureAvroSchema(featureName)));
            featureGroupMap.put(f.getName(), datumReader);
            featureSchemaMap.put(f.getFeaturegroup().getId(), featureGroupMap);
        }
        return featureSchemaMap;
    }

    private void checkPrimaryKeys(Set<String> primaryKeys) {
        if (!this.servingKeys.equals(primaryKeys)) {
            throw new IllegalArgumentException("Provided primary key map doesn't correspond to serving_keys");
        }
    }

    public void close() {
        this.hikariDataSource.close();
        this.executorService.shutdown();
    }

    @Generated
    public VectorServer() {
    }

    @Generated
    public void setHikariDataSource(HikariDataSource hikariDataSource) {
        this.hikariDataSource = hikariDataSource;
    }

    @Generated
    public Map<Integer, TreeMap<String, Integer>> getPreparedStatementParameters() {
        return this.preparedStatementParameters;
    }

    @Generated
    public TreeMap<Integer, ServingPreparedStatement> getOrderedServingPreparedStatements() {
        return this.orderedServingPreparedStatements;
    }

    @Generated
    public HashSet<String> getServingKeys() {
        return this.servingKeys;
    }

    @Generated
    public void setServingKeys(HashSet<String> servingKeys) {
        this.servingKeys = servingKeys;
    }
}

