/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.commands.featurestore.search;

import com.google.common.collect.Sets;
import com.lambdista.util.Try;
import io.hops.hopsworks.common.commands.CommandException;
import io.hops.hopsworks.common.commands.CommandFilterBy;
import io.hops.hopsworks.common.commands.featurestore.search.SearchFSOpenSearchController;
import io.hops.hopsworks.common.dao.AbstractFacade;
import io.hops.hopsworks.common.dao.QueryParam;
import io.hops.hopsworks.common.dao.commands.CommandFacade;
import io.hops.hopsworks.common.dao.commands.search.SearchFSCommandFacade;
import io.hops.hopsworks.common.dao.commands.search.SearchFSCommandHistoryFacade;
import io.hops.hopsworks.common.util.PayaraClusterManager;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.OpenSearchException;
import io.hops.hopsworks.persistence.entity.commands.Command;
import io.hops.hopsworks.persistence.entity.commands.CommandStatus;
import io.hops.hopsworks.persistence.entity.commands.search.SearchFSCommand;
import io.hops.hopsworks.persistence.entity.commands.search.SearchFSCommandHistory;
import io.hops.hopsworks.persistence.entity.commands.search.SearchFSCommandOp;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.DependsOn;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.Timeout;
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerService;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.enterprise.concurrent.ManagedExecutorService;
import javax.naming.InitialContext;

@Startup
@Singleton
@DependsOn(value={"Settings"})
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class SearchFSCommandExecutor {
    private static final Logger LOGGER = Logger.getLogger(SearchFSCommandExecutor.class.getName());
    private static final String EXECUTOR_SERVICE_NAME = "concurrent/condaExecutorService";
    @EJB
    private PayaraClusterManager payaraClusterManager;
    @Resource
    private TimerService timerService;
    @EJB
    private Settings settings;
    @EJB
    private SearchFSCommandFacade commandFacade;
    @EJB
    private SearchFSCommandHistoryFacade commandHistoryFacade;
    @EJB
    private SearchFSOpenSearchController searchController;
    private boolean init = false;
    private ManagedExecutorService executorService;
    private Timer timer;

    @PostConstruct
    public void init() {
        this.init = this.initInt();
        this.schedule();
    }

    private void schedule() {
        this.timer = this.timerService.createSingleActionTimer(this.settings.commandSearchFSProcessTimerPeriod().longValue(), new TimerConfig((Serializable)((Object)"feature store command executor"), false));
    }

    private boolean initInt() {
        try {
            this.executorService = (ManagedExecutorService)InitialContext.doLookup(EXECUTOR_SERVICE_NAME);
            List<SearchFSCommand> resetOngoingCommands = this.commandFacade.updateByQuery(this.queryByStatus(CommandStatus.ONGOING), c -> c.failWith("Could not run search command due to internal server error. Please try again."));
            resetOngoingCommands.forEach(this::saveHistory);
            List<SearchFSCommand> resetCleanCommands = this.commandFacade.updateByQuery(this.queryByStatus(CommandStatus.CLEANING), c -> c.failWith("Could not clean search command due to internal server error. Please try again."));
            resetCleanCommands.forEach(this::saveHistory);
            return true;
        }
        catch (CommandException e) {
            LOGGER.log(Level.SEVERE, "Error resetting commands", (Throwable)((Object)e));
        }
        catch (Exception e) {
            LOGGER.log(Level.SEVERE, "Error looking up for the condaExecutorService", e);
        }
        return false;
    }

    @PreDestroy
    public void destroy() {
        if (this.timer != null) {
            this.timer.cancel();
        }
    }

    @Timeout
    @TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
    public void process() {
        if (!this.init) {
            this.init = this.initInt();
        }
        if (this.init) {
            try {
                this.processInt();
            }
            catch (Exception t) {
                LOGGER.log(Level.INFO, "Command processing failed with error:", t);
            }
        }
        this.schedule();
    }

    private void processInt() throws CommandException {
        if (!this.payaraClusterManager.amIThePrimary()) {
            LOGGER.log(Level.INFO, "not primary");
            return;
        }
        int active = 0;
        int maxOngoing = this.settings.getMaxOngoingOpensearchDocIndexOps();
        List updatingCommands = this.commandFacade.findByQuery(this.queryByStatus(CommandStatus.ONGOING));
        if ((active += updatingCommands.size()) >= maxOngoing) {
            return;
        }
        Set updatingDocs = updatingCommands.stream().map(this::getDocId).collect(Collectors.toSet());
        Map<Integer, Project> updatingProjects = updatingCommands.stream().collect(Collectors.toMap(c -> c.getProject().getId(), Command::getProject, (p1, p2) -> p1));
        List cleaningCommands = this.commandFacade.findByQuery(this.queryByStatus(CommandStatus.CLEANING));
        if ((active += cleaningCommands.size()) >= maxOngoing) {
            return;
        }
        Set cleaningDocs = cleaningCommands.stream().map(this::getDocId).collect(Collectors.toSet());
        updatingProjects.putAll(cleaningCommands.stream().collect(Collectors.toMap(c -> c.getProject().getId(), Command::getProject, (existingP, newP) -> existingP)));
        Map<Integer, SearchFSCommand> toDeleteProjects = this.commandFacade.findByQuery(this.queryDeletedProjects(CommandStatus.NEW)).stream().collect(Collectors.toMap(c -> c.getProject().getId(), c -> c, (existingC, newC) -> existingC));
        Map<Integer, SearchFSCommand> deletingProjects = this.commandFacade.findByQuery(this.queryDeletedProjects(CommandStatus.CLEANING)).stream().collect(Collectors.toMap(c -> c.getProject().getId(), c -> c, (existingC, newC) -> existingC));
        List failedCommands = this.commandFacade.findByQuery(this.queryByStatus(CommandStatus.FAILED));
        Set failedDocs = failedCommands.stream().map(this::getDocId).collect(Collectors.toSet());
        Sets.SetView deletedAndNotActive = Sets.difference(toDeleteProjects.keySet(), updatingProjects.keySet());
        for (Integer idx : deletedAndNotActive) {
            this.cleanDeletedProject(toDeleteProjects.get(idx));
        }
        Set<Project> excludeProjects = this.unionProjects(toDeleteProjects, deletingProjects);
        HashSet<Long> excludeDocs = new HashSet<Long>();
        excludeDocs.addAll(updatingDocs);
        excludeDocs.addAll(cleaningDocs);
        Set<Long> deletingDocs = this.cleanDeletedArtifacts(excludeProjects, excludeDocs, maxOngoing - active);
        excludeDocs.addAll(deletingDocs);
        Set<Long> deletingDocs2 = this.cleanDeleteCascadedArtifacts(excludeProjects, excludeDocs, maxOngoing - (active += deletingDocs.size()));
        excludeDocs.addAll(deletingDocs2);
        excludeDocs.addAll(failedDocs);
        Set<Long> processingDocs = this.processArtifacts(excludeProjects, excludeDocs, maxOngoing - (active += deletingDocs2.size()));
        excludeDocs.addAll(processingDocs);
        for (SearchFSCommand c2 : failedCommands) {
            if (!this.shouldRetry(c2)) continue;
            this.updateCommand(c2, CommandStatus.NEW);
        }
    }

    private Set<Project> unionProjects(Map<Integer, SearchFSCommand> p1, Map<Integer, SearchFSCommand> p2) {
        HashSet<Project> result = new HashSet<Project>();
        p1.values().forEach(c -> result.add(c.getProject()));
        p2.values().forEach(c -> result.add(c.getProject()));
        return result;
    }

    private void cleanDeletedProject(SearchFSCommand command) {
        this.updateCommand(command, CommandStatus.CLEANING);
        this.executorService.submit(() -> {
            Try<Boolean> result = this.processFunction().apply(command);
            try {
                if (((Boolean)result.checkedGet()).booleanValue()) {
                    List toSkip = this.commandFacade.findByQuery(this.queryByProject(command.getProject(), SearchFSCommandOp.DELETE_PROJECT));
                    toSkip.forEach(c -> this.removeCommand((SearchFSCommand)c, CommandStatus.SKIPPED));
                    this.removeCommand(command, CommandStatus.SUCCESS);
                }
            }
            catch (Throwable t) {
                LOGGER.log(Level.INFO, "Project:{0} clean failed with error:{1}", new Object[]{command.getProject().getId(), t.getStackTrace()});
                this.failCommand(command, t.getMessage());
            }
        });
    }

    private Set<Long> cleanDeletedArtifacts(Set<Project> excludeProjects, Set<Long> excludeDocs, int maxOngoing) throws CommandException {
        HashSet<Long> deleting = new HashSet<Long>();
        List<SearchFSCommand> toDelete = this.commandFacade.findByQuery(this.queryByOp(SearchFSCommandOp.DELETE_ARTIFACT), excludeProjects, excludeDocs);
        for (SearchFSCommand command : toDelete) {
            this.cleanDeletedArtifact(command);
            deleting.add(this.getDocId(command));
            if (deleting.size() <= maxOngoing) continue;
            break;
        }
        return deleting;
    }

    private void cleanDeletedArtifact(SearchFSCommand command) {
        this.updateCommand(command, CommandStatus.CLEANING);
        this.executorService.submit(() -> {
            Try<Boolean> result = this.processFunction().apply(command);
            try {
                if (((Boolean)result.checkedGet()).booleanValue()) {
                    QueryParam query = this.queryCommandsExcept(this.getDocId(command), SearchFSCommandOp.DELETE_ARTIFACT);
                    this.commandFacade.findByQuery(query).forEach(c -> this.removeCommand((SearchFSCommand)c, CommandStatus.SKIPPED));
                    this.removeCommand(command, CommandStatus.SUCCESS);
                }
            }
            catch (Throwable t) {
                LOGGER.log(Level.INFO, "Doc:{0} clean failed with error:{1}", new Object[]{command.getFeatureGroup().getId(), t.getStackTrace()});
                this.failCommand(command, t.getMessage());
            }
        });
    }

    private Set<Long> cleanDeleteCascadedArtifacts(Set<Project> excludeProjects, Set<Long> excludeDocs, int maxOngoing) {
        HashSet<Long> deleting = new HashSet<Long>();
        List<SearchFSCommand> toDelete = this.commandFacade.findDeleteCascaded(excludeProjects, excludeDocs, maxOngoing);
        for (SearchFSCommand command : toDelete) {
            this.cleanDeleteCascadedArtifact(command);
            deleting.add(this.getDocId(command));
            if (deleting.size() <= maxOngoing) continue;
            break;
        }
        return deleting;
    }

    private void cleanDeleteCascadedArtifact(SearchFSCommand command) {
        SearchFSCommand deleteArtifact = command.shallowClone();
        deleteArtifact.setOp(SearchFSCommandOp.DELETE_ARTIFACT);
        deleteArtifact.setStatus(CommandStatus.CLEANING);
        this.commandFacade.persistAndFlush(deleteArtifact);
        this.updateCommand(deleteArtifact, CommandStatus.CLEANING);
        this.cleanDeletedArtifact(deleteArtifact);
    }

    private Set<Long> processArtifacts(Set<Project> excludeProjects, Set<Long> excludeDocs, int maxOngoing) {
        HashSet<Long> processing = new HashSet<Long>();
        List<SearchFSCommand> toProcess = this.commandFacade.findToProcess(excludeProjects, excludeDocs, maxOngoing);
        for (SearchFSCommand command : toProcess) {
            this.processArtifact(command, this.processFunction());
            processing.add(this.getDocId(command));
            if (processing.size() < maxOngoing) continue;
            break;
        }
        return processing;
    }

    private void processArtifact(SearchFSCommand command, Function<SearchFSCommand, Try<Boolean>> execute) {
        this.updateCommand(command, CommandStatus.ONGOING);
        this.executorService.submit(() -> {
            Try result = (Try)execute.apply(command);
            try {
                if (((Boolean)result.checkedGet()).booleanValue()) {
                    this.removeCommand(command, CommandStatus.SUCCESS);
                }
            }
            catch (Throwable t) {
                LOGGER.log(Level.INFO, "Command:{0} failed with error:{1}", new Object[]{command, t.getStackTrace()});
                this.failCommand(command, t.getMessage());
            }
        });
    }

    private QueryParam queryByStatus(CommandStatus status) {
        HashSet<AbstractFacade.FilterBy> filters = new HashSet<AbstractFacade.FilterBy>();
        filters.add(new CommandFilterBy(CommandFacade.Filters.STATUS_EQ, status.toString()));
        return new QueryParam(null, null, filters, null);
    }

    private QueryParam queryDeletedProjects(CommandStatus status) {
        HashSet<AbstractFacade.FilterBy> filters = new HashSet<AbstractFacade.FilterBy>();
        filters.add(new CommandFilterBy(SearchFSCommandFacade.SearchFSFilters.OP_EQ, SearchFSCommandOp.DELETE_PROJECT.name()));
        filters.add(new CommandFilterBy(CommandFacade.Filters.STATUS_EQ, status.toString()));
        return new QueryParam(null, null, filters, null);
    }

    private QueryParam queryByProject(Project project, SearchFSCommandOp notOp) {
        HashSet<AbstractFacade.FilterBy> filters = new HashSet<AbstractFacade.FilterBy>();
        filters.add(new CommandFilterBy(CommandFacade.Filters.PROJECT_ID_EQ, project.getId().toString()));
        filters.add(new CommandFilterBy(SearchFSCommandFacade.SearchFSFilters.OP_NEQ, notOp.name()));
        return new QueryParam(null, null, filters, null);
    }

    private QueryParam queryByOp(SearchFSCommandOp op) {
        HashSet<AbstractFacade.FilterBy> filters = new HashSet<AbstractFacade.FilterBy>();
        filters.add(new CommandFilterBy(SearchFSCommandFacade.SearchFSFilters.OP_EQ, op.name()));
        return new QueryParam(null, null, filters, null);
    }

    private QueryParam queryCommandsExcept(Long docId, SearchFSCommandOp notOp) {
        HashSet<AbstractFacade.FilterBy> filters = new HashSet<AbstractFacade.FilterBy>();
        filters.add(new CommandFilterBy(SearchFSCommandFacade.SearchFSFilters.OP_NEQ, notOp.name()));
        filters.add(new CommandFilterBy(SearchFSCommandFacade.SearchFSFilters.DOC_EQ, docId.toString()));
        return new QueryParam(null, null, filters, null);
    }

    private void updateCommand(SearchFSCommand command, CommandStatus status) {
        command.setStatus(status);
        this.commandFacade.update(command);
        this.saveHistory(command);
    }

    private void removeCommand(SearchFSCommand command, CommandStatus status) {
        this.commandFacade.removeById(command.getId());
        command.setStatus(status);
        this.saveHistory(command);
    }

    private void failCommand(SearchFSCommand command, String msg) {
        command.failWith(msg);
        this.commandFacade.update(command);
        this.saveHistory(command);
    }

    private void saveHistory(SearchFSCommand command) {
        if (this.settings.commandSearchFSHistoryEnabled()) {
            this.commandHistoryFacade.persistAndFlush(this.getHistoryStep(command));
        }
    }

    private boolean shouldRetry(SearchFSCommand command) {
        if (this.settings.commandSearchFSHistoryEnabled()) {
            return this.commandHistoryFacade.countRetries(command.getId()) < (long)this.settings.commandRetryPerCleanInterval().intValue();
        }
        return false;
    }

    private Long getDocId(SearchFSCommand command) {
        return command.getInodeId();
    }

    private SearchFSCommandHistory getHistoryStep(SearchFSCommand command) {
        return new SearchFSCommandHistory(command);
    }

    private Function<SearchFSCommand, Try<Boolean>> processFunction() {
        return c -> {
            try {
                return this.processCommand((SearchFSCommand)c);
            }
            catch (Exception t) {
                String errMsg = "command failed due to unhandled error";
                CommandException ex = new CommandException(RESTCodes.CommandErrorCode.INTERNAL_SERVER_ERROR, Level.WARNING, errMsg, errMsg, t);
                return new Try.Failure((Throwable)((Object)ex));
            }
        };
    }

    private Try<Boolean> processCommand(SearchFSCommand c) {
        try {
            if (c.getOp().equals((Object)SearchFSCommandOp.DELETE_PROJECT)) {
                this.searchController.deleteProject(c.getProject());
                return Try.apply(() -> true);
            }
            if (c.getOp().equals((Object)SearchFSCommandOp.DELETE_ARTIFACT)) {
                this.searchController.delete(c.getInodeId());
                return Try.apply(() -> true);
            }
            if (c.getProject() == null) {
                LOGGER.log(Level.FINE, "project deleted - delaying command");
                return Try.apply(() -> false);
            }
            if (c.getFeatureGroup() == null && c.getFeatureView() == null && c.getTrainingDataset() == null) {
                LOGGER.log(Level.FINE, "artifact deleted - delaying command");
                return Try.apply(() -> false);
            }
            switch (c.getOp()) {
                case CREATE: {
                    this.searchController.create(c.getInodeId(), c);
                    return Try.apply(() -> true);
                }
                case UPDATE_TAGS: {
                    this.searchController.updateTags(c.getInodeId(), c);
                    return Try.apply(() -> true);
                }
                case UPDATE_KEYWORDS: {
                    this.searchController.updateKeywords(c.getInodeId(), c);
                    return Try.apply(() -> true);
                }
                case UPDATE_FEATURESTORE: {
                    this.searchController.setFeaturestore(c.getInodeId(), c);
                    return Try.apply(() -> true);
                }
            }
            CommandException ex = new CommandException(RESTCodes.CommandErrorCode.NOT_IMPLEMENTED, Level.WARNING, "unhandled command op:" + c.getOp());
            return new Try.Failure((Throwable)((Object)ex));
        }
        catch (OpenSearchException e) {
            String errMsg = "command failed due to opensearch error";
            CommandException ex = new CommandException(RESTCodes.CommandErrorCode.OPENSEARCH_ACCESS_ERROR, Level.WARNING, errMsg, errMsg, e);
            return new Try.Failure((Throwable)((Object)ex));
        }
        catch (CommandException e) {
            return new Try.Failure((Throwable)((Object)e));
        }
    }
}

