/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.query;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeatureGroupCommitController;
import io.hops.hopsworks.common.featurestore.featuregroup.ondemand.OnDemandFeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
import io.hops.hopsworks.common.featurestore.query.Feature;
import io.hops.hopsworks.common.featurestore.query.Query;
import io.hops.hopsworks.common.featurestore.query.QueryDTO;
import io.hops.hopsworks.common.featurestore.query.filter.Filter;
import io.hops.hopsworks.common.featurestore.query.filter.FilterController;
import io.hops.hopsworks.common.featurestore.query.filter.FilterLogic;
import io.hops.hopsworks.common.featurestore.query.join.Join;
import io.hops.hopsworks.common.featurestore.query.join.JoinDTO;
import io.hops.hopsworks.common.featurestore.trainingdatasets.TrainingDatasetController;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.FeatureGroupCommit;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.TimeTravelFormat;
import io.hops.hopsworks.persistence.entity.featurestore.featureview.FeatureView;
import io.hops.hopsworks.persistence.entity.featurestore.trainingdataset.SqlCondition;
import io.hops.hopsworks.persistence.entity.featurestore.trainingdataset.SqlFilterLogic;
import io.hops.hopsworks.persistence.entity.featurestore.trainingdataset.TrainingDatasetFeature;
import io.hops.hopsworks.persistence.entity.featurestore.trainingdataset.TrainingDatasetFilter;
import io.hops.hopsworks.persistence.entity.featurestore.trainingdataset.TrainingDatasetJoin;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.calcite.sql.JoinType;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class QueryController {
    @EJB
    private FeaturegroupController featuregroupController;
    @EJB
    private FeaturegroupFacade featuregroupFacade;
    @EJB
    private FeatureGroupCommitController featureGroupCommitController;
    @EJB
    private FilterController filterController;
    @EJB
    private FeaturestoreController featurestoreController;
    @EJB
    private OnlineFeaturestoreController onlineFeaturestoreController;
    @EJB
    private TrainingDatasetController trainingDatasetController;
    private static final String ALL_FEATURES = "*";

    public QueryController() {
    }

    public QueryController(FeaturegroupController featuregroupController, FeaturegroupFacade featuregroupFacade, FilterController filterController, FeaturestoreController featurestoreController, OnlineFeaturestoreController onlineFeaturestoreController, FeatureGroupCommitController featureGroupCommitController) {
        this.featuregroupController = featuregroupController;
        this.featuregroupFacade = featuregroupFacade;
        this.filterController = filterController;
        this.featurestoreController = featurestoreController;
        this.onlineFeaturestoreController = onlineFeaturestoreController;
        this.featureGroupCommitController = featureGroupCommitController;
    }

    public Query convertQueryDTO(Project project, Users user, QueryDTO queryDTO, boolean pitEnabled) throws FeaturestoreException {
        HashMap<Integer, String> fgAliasLookup = new HashMap<Integer, String>();
        HashMap<Integer, Featuregroup> fgLookup = new HashMap<Integer, Featuregroup>();
        HashMap<Integer, List<Feature>> availableFeatureLookup = new HashMap<Integer, List<Feature>>();
        this.populateFgLookupTables(queryDTO, 0, fgAliasLookup, fgLookup, availableFeatureLookup, project, user, null);
        return this.convertQueryDTO(queryDTO, fgAliasLookup, fgLookup, availableFeatureLookup, pitEnabled);
    }

    public Set<Featuregroup> getFeatureGroups(Query query) {
        Set<Featuregroup> allFgs = query.getJoins().stream().map(join -> join.getRightQuery().getFeaturegroup()).collect(Collectors.toSet());
        allFgs.add(query.getFeaturegroup());
        return allFgs;
    }

    public Query convertQueryDTO(QueryDTO queryDTO, Map<Integer, String> fgAliasLookup, Map<Integer, Featuregroup> fgLookup, Map<Integer, List<Feature>> availableFeatureLookup, boolean pitEnabled) throws FeaturestoreException {
        this.checkNestedJoin(queryDTO);
        this.checkSpineLeftSide(queryDTO);
        Integer fgId = queryDTO.getLeftFeatureGroup().getId();
        Featuregroup fg = fgLookup.get(fgId);
        String featureStore = this.featurestoreController.getOfflineFeaturestoreDbName(fg.getFeaturestore());
        String projectName = this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(fg.getFeaturestore().getProject());
        List<Feature> requestedFeatures = this.validateFeatures(fg, queryDTO.getLeftFeatures(), availableFeatureLookup.get(fgId));
        Query query = new Query(featureStore, projectName, fg, fgAliasLookup.get(fgId), requestedFeatures, availableFeatureLookup.get(fgId), queryDTO.getHiveEngine());
        if (fg.getStreamFeatureGroup() != null || fg.getCachedFeaturegroup() != null && fg.getCachedFeaturegroup().getTimeTravelFormat() == TimeTravelFormat.HUDI) {
            if (queryDTO.getLeftFeatureGroupEndTime() != null) {
                FeatureGroupCommit endCommit = this.featureGroupCommitController.findCommitByDate(fg, queryDTO.getLeftFeatureGroupEndTime()).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.NO_DATA_AVAILABLE_FEATUREGROUP_COMMITDATE, Level.FINE, "featureGroup: " + fg.getName() + " version " + fg.getVersion()));
                query.setLeftFeatureGroupEndTimestamp(endCommit.getCommittedOn());
                query.setLeftFeatureGroupEndCommitId(endCommit.getFeatureGroupCommitPK().getCommitId());
            }
            if (queryDTO.getLeftFeatureGroupStartTime() != null) {
                Integer commitCount = this.featureGroupCommitController.countCommitsInRange(fg, queryDTO.getLeftFeatureGroupStartTime(), queryDTO.getLeftFeatureGroupEndTime());
                if (commitCount == 0) {
                    throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.NO_DATA_AVAILABLE_FEATUREGROUP_COMMITDATE, Level.FINE, "featureGroup: " + fg.getName() + " version " + fg.getVersion());
                }
                query.setLeftFeatureGroupStartTimestamp(queryDTO.getLeftFeatureGroupStartTime());
            }
        }
        if (queryDTO.getJoins() != null && !queryDTO.getJoins().isEmpty()) {
            query.setJoins(this.convertJoins(query, queryDTO.getJoins(), fgAliasLookup, fgLookup, availableFeatureLookup, pitEnabled));
            this.removeDuplicateColumns(query, pitEnabled);
        }
        if (queryDTO.getFilter() != null) {
            query.setFilter(this.filterController.convertFilterLogic(queryDTO.getFilter(), availableFeatureLookup));
        }
        return query;
    }

    void checkNestedJoin(QueryDTO queryDTO) throws FeaturestoreException {
        if (queryDTO.getJoins() != null) {
            for (JoinDTO join : queryDTO.getJoins()) {
                if (join.getQuery().getJoins() == null || join.getQuery().getJoins().size() <= 0) continue;
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.NESTED_JOIN_NOT_ALLOWED, Level.SEVERE, "Nested join is not supported.");
            }
        }
    }

    void checkSpineLeftSide(QueryDTO queryDTO) throws FeaturestoreException {
        if (queryDTO.getJoins() != null) {
            for (JoinDTO join : queryDTO.getJoins()) {
                if (!(join.getQuery().getLeftFeatureGroup() instanceof OnDemandFeaturegroupDTO) || !((OnDemandFeaturegroupDTO)join.getQuery().getLeftFeatureGroup()).getSpine().booleanValue()) continue;
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.SPINE_GROUP_ON_RIGHT_SIDE_OF_JOIN_NOT_ALLOWED, Level.SEVERE, "Using a spine group on the right side of the join is not supported. Spine groups should be used only for labels on the left side.");
            }
        }
    }

    public Query appendFilter(Query query, SqlFilterLogic sqlLogic, FilterLogic filterLogic) {
        FilterLogic currentFilter = query.getFilter();
        if (currentFilter != null) {
            FilterLogic filter = new FilterLogic(sqlLogic, currentFilter, filterLogic);
            query.setFilter(filter);
        } else {
            query.setFilter(filterLogic);
        }
        return query;
    }

    public Query appendEventTimeFilter(Query query, Date startTime, Date endTime) throws FeaturestoreException {
        query = this.appendEventTimeFilter(query, startTime, SqlCondition.GREATER_THAN_OR_EQUAL);
        return this.appendEventTimeFilter(query, endTime, SqlCondition.LESS_THAN);
    }

    private Query appendEventTimeFilter(Query query, Date eventTime, SqlCondition sqlCondition) throws FeaturestoreException {
        if (eventTime != null) {
            Filter eventTimeFilter = this.createEventTimeFilter(this.getEventTimeFeature(query), sqlCondition, eventTime);
            return this.appendFilter(query, SqlFilterLogic.AND, new FilterLogic(eventTimeFilter));
        }
        return query;
    }

    Feature getEventTimeFeature(Query query) throws FeaturestoreException {
        String eventTimeFieldName = query.getFeaturegroup().getEventTime();
        if (eventTimeFieldName == null) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.EVENT_TIME_FEATURE_NOT_FOUND, Level.FINE, "Cannot find event feature in feature group " + query.getFeaturegroup().getName());
        }
        return query.getAvailableFeatures().stream().filter(feature -> feature.getName().equals(eventTimeFieldName)).findFirst().orElseThrow(IllegalStateException::new);
    }

    Filter createEventTimeFilter(Feature feature, SqlCondition condition, Date time) throws FeaturestoreException {
        String value = this.filterController.convertToEventTimeFeatureValue(feature, time);
        return new Filter(feature, condition, value);
    }

    public int populateFgLookupTables(QueryDTO queryDTO, int fgId, Map<Integer, String> fgAliasLookup, Map<Integer, Featuregroup> fgLookup, Map<Integer, List<Feature>> availableFeatureLookup, Project project, Users user, String prefix) throws FeaturestoreException {
        if (queryDTO.getJoins() != null && !queryDTO.getJoins().isEmpty()) {
            for (JoinDTO join : queryDTO.getJoins()) {
                fgId = this.populateFgLookupTables(join.getQuery(), fgId, fgAliasLookup, fgLookup, availableFeatureLookup, project, user, join.getPrefix());
                ++fgId;
            }
        }
        Featuregroup fg = this.validateFeaturegroupDTO(queryDTO.getLeftFeatureGroup());
        fgLookup.put(fg.getId(), fg);
        fgAliasLookup.put(fg.getId(), this.generateAs(fgId));
        List availableFeatures = this.featuregroupController.getFeatures(fg, project, user).stream().map(f -> new Feature(f.getName(), (String)fgAliasLookup.get(fg.getId()), f.getType(), f.getDefaultValue(), f.getPrimary(), fg, prefix)).collect(Collectors.toList());
        availableFeatureLookup.put(fg.getId(), availableFeatures);
        return fgId;
    }

    private String generateAs(int id) {
        return "fg" + id;
    }

    private Featuregroup validateFeaturegroupDTO(FeaturegroupDTO featuregroupDTO) throws FeaturestoreException {
        if (featuregroupDTO == null) {
            throw new IllegalArgumentException("Feature group not specified");
        }
        return this.featuregroupFacade.findById(featuregroupDTO.getId()).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATUREGROUP_NOT_FOUND, Level.FINE, "Could not find feature group with ID " + featuregroupDTO.getId()));
    }

    protected List<Feature> validateFeatures(Featuregroup fg, List<FeatureGroupFeatureDTO> requestedFeatures, List<Feature> availableFeatures) throws FeaturestoreException {
        ArrayList<Feature> featureList = new ArrayList<Feature>();
        if (requestedFeatures.size() == 1 && requestedFeatures.get(0).getName().equals(ALL_FEATURES)) {
            featureList.addAll(availableFeatures);
        } else {
            for (FeatureGroupFeatureDTO requestedFeature : requestedFeatures) {
                featureList.add(availableFeatures.stream().filter(af -> af.getName().equals(requestedFeature.getName())).findFirst().orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURE_DOES_NOT_EXIST, Level.FINE, "Feature: " + requestedFeature.getName() + " not found in feature group: " + fg.getName())));
            }
        }
        return featureList;
    }

    private List<Join> convertJoins(Query leftQuery, List<JoinDTO> joinDTOS, Map<Integer, String> fgAliasLookup, Map<Integer, Featuregroup> fgLookup, Map<Integer, List<Feature>> availableFeatureLookup, boolean pitEnabled) throws FeaturestoreException {
        ArrayList<Join> joins = new ArrayList<Join>();
        for (JoinDTO joinDTO : joinDTOS) {
            List<Feature> rightOn;
            List<Feature> leftOn;
            if (joinDTO.getQuery() == null) {
                throw new IllegalArgumentException("Subquery not specified");
            }
            Query rightQuery = this.convertQueryDTO(joinDTO.getQuery(), fgAliasLookup, fgLookup, availableFeatureLookup, pitEnabled);
            if (joinDTO.getOn() != null && !joinDTO.getOn().isEmpty()) {
                leftOn = joinDTO.getOn().stream().map(f -> new Feature(f.getName())).collect(Collectors.toList());
                rightOn = joinDTO.getOn().stream().map(f -> new Feature(f.getName())).collect(Collectors.toList());
                joins.add(this.extractLeftRightOn(leftQuery, rightQuery, leftOn, rightOn, joinDTO.getType(), joinDTO.getPrefix()));
                continue;
            }
            if (joinDTO.getLeftOn() != null && !joinDTO.getLeftOn().isEmpty()) {
                leftOn = joinDTO.getLeftOn().stream().map(f -> new Feature(f.getName())).collect(Collectors.toList());
                rightOn = joinDTO.getRightOn().stream().map(f -> new Feature(f.getName())).collect(Collectors.toList());
                joins.add(this.extractLeftRightOn(leftQuery, rightQuery, leftOn, rightOn, joinDTO.getType(), joinDTO.getPrefix()));
                continue;
            }
            joins.add(this.extractPrimaryKeysJoin(leftQuery, rightQuery, joinDTO.getType(), joinDTO.getPrefix()));
        }
        return joins;
    }

    protected Join extractPrimaryKeysJoin(Query leftQuery, Query rightQuery, JoinType joinType, String prefix) throws FeaturestoreException {
        ArrayList<Feature> joinFeatures = new ArrayList<Feature>();
        leftQuery.getAvailableFeatures().stream().filter(Feature::isPrimary).forEach(lf -> joinFeatures.addAll(rightQuery.getAvailableFeatures().stream().filter(rf -> rf.getName().equals(lf.getName()) && rf.isPrimary()).collect(Collectors.toList())));
        if (joinFeatures.isEmpty()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.NO_PK_JOINING_KEYS, Level.FINE, leftQuery.getFeaturegroup().getName() + " and: " + rightQuery.getFeaturegroup().getName());
        }
        List<SqlCondition> joinOperator = joinFeatures.stream().map(f -> SqlCondition.EQUALS).collect(Collectors.toList());
        return new Join(leftQuery, rightQuery, joinFeatures, joinFeatures, joinType, prefix, joinOperator);
    }

    public Join extractLeftRightOn(Query leftQuery, Query rightQuery, List<Feature> leftOn, List<Feature> rightOn, JoinType joinType, String prefix) throws FeaturestoreException {
        if (leftOn.size() != rightOn.size()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.LEFT_RIGHT_ON_DIFF_SIZES, Level.FINE);
        }
        List<SqlCondition> joinOperator = leftOn.stream().map(f -> SqlCondition.EQUALS).collect(Collectors.toList());
        if (joinOperator.size() != leftOn.size()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.JOIN_OPERATOR_MISMATCH, Level.FINE);
        }
        for (Feature feature : leftOn) {
            this.checkFeatureExistsAndSetAttributes(leftQuery, feature);
        }
        for (Feature feature : rightOn) {
            this.checkFeatureExistsAndSetAttributes(rightQuery, feature);
        }
        return new Join(leftQuery, rightQuery, leftOn, rightOn, joinType, prefix, joinOperator);
    }

    private void checkFeatureExistsAndSetAttributes(Query query, Feature feature) throws FeaturestoreException {
        Optional<Feature> availableFeature = query.getAvailableFeatures().stream().filter(f -> f.getName().equals(feature.getName())).findAny();
        if (!availableFeature.isPresent()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURE_DOES_NOT_EXIST, Level.FINE, "Could not find Join feature " + feature.getName() + " in feature group: " + query.getFeaturegroup().getName());
        }
        feature.setDefaultValue(availableFeature.get().getDefaultValue());
        feature.setType(availableFeature.get().getType());
        feature.setFgAlias(availableFeature.get().getFgAlias());
    }

    void removeDuplicateColumns(Query query, boolean pitEnabled) {
        for (Join join : query.getJoins()) {
            List leftJoinFeatureNames = join.getLeftOn().stream().map(Feature::getName).collect(Collectors.toList());
            ArrayList<Feature> filteredRightFeatures = new ArrayList<Feature>();
            for (Feature rightFeature : join.getRightQuery().getFeatures()) {
                if (leftJoinFeatureNames.contains(rightFeature.getName()) && join.getLeftQuery().getFeatures().stream().anyMatch(lf -> lf.getName().equals(Strings.isNullOrEmpty((String)rightFeature.getPrefix()) ? rightFeature.getName() : rightFeature.getPrefix() + rightFeature.getName()))) continue;
                filteredRightFeatures.add(rightFeature);
            }
            join.getRightQuery().setFeatures(filteredRightFeatures);
        }
    }

    public Query makeQuery(FeatureView featureView, Project project, Users user, boolean withLabel, Boolean isHiveEngine) throws FeaturestoreException {
        return this.makeQuery(featureView, project, user, withLabel, isHiveEngine, Lists.newArrayList());
    }

    public Query makeQuery(FeatureView featureView, Project project, Users user, boolean withLabel, Boolean isHiveEngine, Collection<TrainingDatasetFilter> extraFilters) throws FeaturestoreException {
        List<TrainingDatasetJoin> joins = featureView.getJoins().stream().sorted(Comparator.comparing(TrainingDatasetJoin::getIndex)).collect(Collectors.toList());
        List<TrainingDatasetFeature> tdFeatures = featureView.getFeatures().stream().sorted((t1, t2) -> {
            if (t1.getIndex() != null) {
                return t1.getIndex().compareTo(t2.getIndex());
            }
            return t1.getName().compareTo(t2.getName());
        }).filter(f -> !f.isLabel() || withLabel).collect(Collectors.toList());
        return this.trainingDatasetController.getQuery(joins, tdFeatures, featureView.getFilters(), project, user, isHiveEngine, extraFilters);
    }

    public Query constructBatchQuery(FeatureView featureView, Project project, Users user, Long startTimestamp, Long endTimestamp, Boolean withLabel, Boolean isHiveEngine, Integer trainingDataVersion) throws FeaturestoreException {
        Date startTime = startTimestamp == null ? null : new Date(startTimestamp);
        Date endTime = endTimestamp == null ? null : new Date(endTimestamp);
        return this.constructBatchQuery(featureView, project, user, startTime, endTime, withLabel, isHiveEngine, trainingDataVersion);
    }

    public Query constructBatchQuery(FeatureView featureView, Project project, Users user, Date startTime, Date endTime, Boolean withLabel, Boolean isHiveEngine, Integer trainingDataVersion) throws FeaturestoreException {
        Query baseQuery = trainingDataVersion != null ? this.makeQuery(featureView, project, user, withLabel, isHiveEngine, this.trainingDatasetController.getTrainingDatasetByFeatureViewAndVersion(featureView, trainingDataVersion).getFilters()) : this.makeQuery(featureView, project, user, withLabel, isHiveEngine);
        return this.appendEventTimeFilter(baseQuery, startTime, endTime);
    }
}

