package io.hops.hopsworks.common.commands.featurestore.search;

import com.google.common.collect.Sets;
import com.lambdista.util.Try;
import io.hops.hopsworks.common.commands.CommandException;
import io.hops.hopsworks.common.commands.CommandFilterBy;
import io.hops.hopsworks.common.dao.QueryParam;
import io.hops.hopsworks.common.dao.commands.CommandFacade;
import io.hops.hopsworks.common.dao.commands.search.SearchFSCommandFacade;
import io.hops.hopsworks.common.dao.commands.search.SearchFSCommandHistoryFacade;
import io.hops.hopsworks.common.util.PayaraClusterManager;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.OpenSearchException;
import io.hops.hopsworks.persistence.entity.commands.CommandStatus;
import io.hops.hopsworks.persistence.entity.commands.search.SearchFSCommand;
import io.hops.hopsworks.persistence.entity.commands.search.SearchFSCommandHistory;
import io.hops.hopsworks.persistence.entity.commands.search.SearchFSCommandOp;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.restutils.RESTCodes;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.DependsOn;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.Timeout;
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerService;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.enterprise.concurrent.ManagedExecutorService;
import javax.naming.InitialContext;

@DependsOn({"Settings"})
@Singleton
@TransactionAttribute(TransactionAttributeType.NEVER)
@Startup
/* loaded from: input_file:io/hops/hopsworks/common/commands/featurestore/search/SearchFSCommandExecutor.class */
public class SearchFSCommandExecutor {
    private static final Logger LOGGER = Logger.getLogger(SearchFSCommandExecutor.class.getName());
    private static final String EXECUTOR_SERVICE_NAME = "concurrent/condaExecutorService";

    @EJB
    private PayaraClusterManager payaraClusterManager;

    @Resource
    private TimerService timerService;

    @EJB
    private Settings settings;

    @EJB
    private SearchFSCommandFacade commandFacade;

    @EJB
    private SearchFSCommandHistoryFacade commandHistoryFacade;

    @EJB
    private SearchFSOpenSearchController searchController;
    private boolean init = false;
    private ManagedExecutorService executorService;
    private Timer timer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.hops.hopsworks.common.commands.featurestore.search.SearchFSCommandExecutor$1, reason: invalid class name */
    /* loaded from: input_file:io/hops/hopsworks/common/commands/featurestore/search/SearchFSCommandExecutor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$hops$hopsworks$persistence$entity$commands$search$SearchFSCommandOp = new int[SearchFSCommandOp.values().length];

        static {
            try {
                $SwitchMap$io$hops$hopsworks$persistence$entity$commands$search$SearchFSCommandOp[SearchFSCommandOp.CREATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$hops$hopsworks$persistence$entity$commands$search$SearchFSCommandOp[SearchFSCommandOp.UPDATE_TAGS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$hops$hopsworks$persistence$entity$commands$search$SearchFSCommandOp[SearchFSCommandOp.UPDATE_FEATURESTORE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @PostConstruct
    public void init() {
        this.init = initInt();
        schedule();
    }

    private void schedule() {
        this.timer = this.timerService.createSingleActionTimer(this.settings.commandProcessTimerPeriod().longValue(), new TimerConfig("feature store command executor", false));
    }

    private boolean initInt() {
        try {
            this.executorService = (ManagedExecutorService) InitialContext.doLookup(EXECUTOR_SERVICE_NAME);
            this.commandFacade.updateByQuery(queryByStatus(CommandStatus.ONGOING), searchFSCommand -> {
                searchFSCommand.failWith("Could not run search command due to internal server error. Please try again.");
            }).forEach(searchFSCommand2 -> {
                this.commandHistoryFacade.persistAndFlush(getHistoryStep(searchFSCommand2));
            });
            this.commandFacade.updateByQuery(queryByStatus(CommandStatus.CLEANING), searchFSCommand3 -> {
                searchFSCommand3.failWith("Could not clean search command due to internal server error. Please try again.");
            }).forEach(searchFSCommand4 -> {
                this.commandHistoryFacade.persistAndFlush(getHistoryStep(searchFSCommand4));
            });
            return true;
        } catch (CommandException e) {
            LOGGER.log(Level.SEVERE, "Error resetting commands", (Throwable) e);
            return false;
        } catch (Exception e2) {
            LOGGER.log(Level.SEVERE, "Error looking up for the condaExecutorService", (Throwable) e2);
            return false;
        }
    }

    @PreDestroy
    public void destroy() {
        if (this.timer != null) {
            this.timer.cancel();
        }
    }

    @Timeout
    @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
    public void process() {
        if (!this.init) {
            this.init = initInt();
        }
        if (this.init) {
            try {
                processInt();
            } catch (Exception e) {
                LOGGER.log(Level.INFO, "Command processing failed with error:", (Throwable) e);
            }
        }
        schedule();
    }

    private void processInt() throws CommandException {
        if (!this.payaraClusterManager.amIThePrimary()) {
            LOGGER.log(Level.INFO, "not primary");
            return;
        }
        int maxOngoingOpensearchDocIndexOps = this.settings.getMaxOngoingOpensearchDocIndexOps();
        List<SearchFSCommand> findByQuery = this.commandFacade.findByQuery(queryByStatus(CommandStatus.ONGOING));
        int size = 0 + findByQuery.size();
        if (size >= maxOngoingOpensearchDocIndexOps) {
            return;
        }
        Collection<? extends Long> collection = (Set) findByQuery.stream().map(this::getDocId).collect(Collectors.toSet());
        Map map = (Map) findByQuery.stream().collect(Collectors.toMap(searchFSCommand -> {
            return searchFSCommand.getProject().getId();
        }, (v0) -> {
            return v0.getProject();
        }, (project, project2) -> {
            return project;
        }));
        List<SearchFSCommand> findByQuery2 = this.commandFacade.findByQuery(queryByStatus(CommandStatus.CLEANING));
        int size2 = size + findByQuery2.size();
        if (size2 >= maxOngoingOpensearchDocIndexOps) {
            return;
        }
        Collection<? extends Long> collection2 = (Set) findByQuery2.stream().map(this::getDocId).collect(Collectors.toSet());
        map.putAll((Map) findByQuery2.stream().collect(Collectors.toMap(searchFSCommand2 -> {
            return searchFSCommand2.getProject().getId();
        }, (v0) -> {
            return v0.getProject();
        }, (project3, project4) -> {
            return project3;
        })));
        Map<Integer, SearchFSCommand> map2 = (Map) this.commandFacade.findByQuery(queryDeletedProjects(CommandStatus.NEW)).stream().collect(Collectors.toMap(searchFSCommand3 -> {
            return searchFSCommand3.getProject().getId();
        }, searchFSCommand4 -> {
            return searchFSCommand4;
        }, (searchFSCommand5, searchFSCommand6) -> {
            return searchFSCommand5;
        }));
        Map<Integer, SearchFSCommand> map3 = (Map) this.commandFacade.findByQuery(queryDeletedProjects(CommandStatus.CLEANING)).stream().collect(Collectors.toMap(searchFSCommand7 -> {
            return searchFSCommand7.getProject().getId();
        }, searchFSCommand8 -> {
            return searchFSCommand8;
        }, (searchFSCommand9, searchFSCommand10) -> {
            return searchFSCommand9;
        }));
        List<SearchFSCommand> findByQuery3 = this.commandFacade.findByQuery(queryByStatus(CommandStatus.FAILED));
        Collection<? extends Long> collection3 = (Set) findByQuery3.stream().map(this::getDocId).collect(Collectors.toSet());
        Iterator it = Sets.difference(map2.keySet(), map.keySet()).iterator();
        while (it.hasNext()) {
            cleanDeletedProject(map2.get((Integer) it.next()));
        }
        Set<Project> unionProjects = unionProjects(map2, map3);
        Set<Long> hashSet = new HashSet<>();
        hashSet.addAll(collection);
        hashSet.addAll(collection2);
        Set<Long> cleanDeletedArtifacts = cleanDeletedArtifacts(unionProjects, hashSet, maxOngoingOpensearchDocIndexOps - size2);
        int size3 = size2 + cleanDeletedArtifacts.size();
        hashSet.addAll(cleanDeletedArtifacts);
        Set<Long> cleanDeleteCascadedArtifacts = cleanDeleteCascadedArtifacts(unionProjects, hashSet, maxOngoingOpensearchDocIndexOps - size3);
        int size4 = size3 + cleanDeleteCascadedArtifacts.size();
        hashSet.addAll(cleanDeleteCascadedArtifacts);
        hashSet.addAll(collection3);
        hashSet.addAll(processArtifacts(unionProjects, hashSet, maxOngoingOpensearchDocIndexOps - size4));
        for (SearchFSCommand searchFSCommand11 : findByQuery3) {
            if (this.commandHistoryFacade.countRetries(searchFSCommand11.getId()) < this.settings.commandRetryPerCleanInterval().intValue()) {
                updateCommand(searchFSCommand11, CommandStatus.NEW);
            }
        }
    }

    private Set<Project> unionProjects(Map<Integer, SearchFSCommand> map, Map<Integer, SearchFSCommand> map2) {
        HashSet hashSet = new HashSet();
        map.values().forEach(searchFSCommand -> {
            hashSet.add(searchFSCommand.getProject());
        });
        map2.values().forEach(searchFSCommand2 -> {
            hashSet.add(searchFSCommand2.getProject());
        });
        return hashSet;
    }

    private void cleanDeletedProject(SearchFSCommand searchFSCommand) {
        updateCommand(searchFSCommand, CommandStatus.CLEANING);
        this.executorService.submit(() -> {
            try {
                if (((Boolean) processFunction().apply(searchFSCommand).checkedGet()).booleanValue()) {
                    this.commandFacade.findByQuery(queryByProject(searchFSCommand.getProject(), SearchFSCommandOp.DELETE_PROJECT)).forEach(searchFSCommand2 -> {
                        removeCommand(searchFSCommand2, CommandStatus.SKIPPED);
                    });
                    removeCommand(searchFSCommand, CommandStatus.SUCCESS);
                }
            } catch (Throwable th) {
                LOGGER.log(Level.INFO, "Project:{0} clean failed with error:{1}", new Object[]{searchFSCommand.getProject().getId(), th.getStackTrace()});
                failCommand(searchFSCommand, th.getMessage());
            }
        });
    }

    private Set<Long> cleanDeletedArtifacts(Set<Project> set, Set<Long> set2, int i) throws CommandException {
        HashSet hashSet = new HashSet();
        for (SearchFSCommand searchFSCommand : this.commandFacade.findByQuery(queryByOp(SearchFSCommandOp.DELETE_ARTIFACT), set, set2)) {
            cleanDeletedArtifact(searchFSCommand);
            hashSet.add(getDocId(searchFSCommand));
            if (hashSet.size() > i) {
                break;
            }
        }
        return hashSet;
    }

    private void cleanDeletedArtifact(SearchFSCommand searchFSCommand) {
        updateCommand(searchFSCommand, CommandStatus.CLEANING);
        this.executorService.submit(() -> {
            try {
                if (((Boolean) processFunction().apply(searchFSCommand).checkedGet()).booleanValue()) {
                    this.commandFacade.findByQuery(queryCommandsExcept(getDocId(searchFSCommand), SearchFSCommandOp.DELETE_ARTIFACT)).forEach(searchFSCommand2 -> {
                        removeCommand(searchFSCommand2, CommandStatus.SKIPPED);
                    });
                    removeCommand(searchFSCommand, CommandStatus.SUCCESS);
                }
            } catch (Throwable th) {
                LOGGER.log(Level.INFO, "Doc:{0} clean failed with error:{1}", new Object[]{searchFSCommand.getFeatureGroup().getId(), th.getStackTrace()});
                failCommand(searchFSCommand, th.getMessage());
            }
        });
    }

    private Set<Long> cleanDeleteCascadedArtifacts(Set<Project> set, Set<Long> set2, int i) {
        HashSet hashSet = new HashSet();
        for (SearchFSCommand searchFSCommand : this.commandFacade.findDeleteCascaded(set, set2, i)) {
            cleanDeleteCascadedArtifact(searchFSCommand);
            hashSet.add(getDocId(searchFSCommand));
            if (hashSet.size() > i) {
                break;
            }
        }
        return hashSet;
    }

    private void cleanDeleteCascadedArtifact(SearchFSCommand searchFSCommand) {
        SearchFSCommand shallowClone = searchFSCommand.shallowClone();
        shallowClone.setOp(SearchFSCommandOp.DELETE_ARTIFACT);
        shallowClone.setStatus(CommandStatus.CLEANING);
        this.commandFacade.persistAndFlush(shallowClone);
        updateCommand(shallowClone, CommandStatus.CLEANING);
        cleanDeletedArtifact(shallowClone);
    }

    private Set<Long> processArtifacts(Set<Project> set, Set<Long> set2, int i) {
        HashSet hashSet = new HashSet();
        for (SearchFSCommand searchFSCommand : this.commandFacade.findToProcess(set, set2, i)) {
            processArtifact(searchFSCommand, processFunction());
            hashSet.add(getDocId(searchFSCommand));
            if (hashSet.size() >= i) {
                break;
            }
        }
        return hashSet;
    }

    private void processArtifact(SearchFSCommand searchFSCommand, Function<SearchFSCommand, Try<Boolean>> function) {
        updateCommand(searchFSCommand, CommandStatus.ONGOING);
        this.executorService.submit(() -> {
            try {
                if (((Boolean) ((Try) function.apply(searchFSCommand)).checkedGet()).booleanValue()) {
                    removeCommand(searchFSCommand, CommandStatus.SUCCESS);
                }
            } catch (Throwable th) {
                LOGGER.log(Level.INFO, "Command:{0} failed with error:{1}", new Object[]{searchFSCommand, th.getStackTrace()});
                failCommand(searchFSCommand, th.getMessage());
            }
        });
    }

    private QueryParam queryByStatus(CommandStatus commandStatus) {
        HashSet hashSet = new HashSet();
        hashSet.add(new CommandFilterBy(CommandFacade.Filters.STATUS_EQ, commandStatus.toString()));
        return new QueryParam(null, null, hashSet, null);
    }

    private QueryParam queryDeletedProjects(CommandStatus commandStatus) {
        HashSet hashSet = new HashSet();
        hashSet.add(new CommandFilterBy(SearchFSCommandFacade.SearchFSFilters.OP_EQ, SearchFSCommandOp.DELETE_PROJECT.name()));
        hashSet.add(new CommandFilterBy(CommandFacade.Filters.STATUS_EQ, commandStatus.toString()));
        return new QueryParam(null, null, hashSet, null);
    }

    private QueryParam queryByProject(Project project, SearchFSCommandOp searchFSCommandOp) {
        HashSet hashSet = new HashSet();
        hashSet.add(new CommandFilterBy(CommandFacade.Filters.PROJECT_ID_EQ, project.getId().toString()));
        hashSet.add(new CommandFilterBy(SearchFSCommandFacade.SearchFSFilters.OP_NEQ, searchFSCommandOp.name()));
        return new QueryParam(null, null, hashSet, null);
    }

    private QueryParam queryByOp(SearchFSCommandOp searchFSCommandOp) {
        HashSet hashSet = new HashSet();
        hashSet.add(new CommandFilterBy(SearchFSCommandFacade.SearchFSFilters.OP_EQ, searchFSCommandOp.name()));
        return new QueryParam(null, null, hashSet, null);
    }

    private QueryParam queryCommandsExcept(Long l, SearchFSCommandOp searchFSCommandOp) {
        HashSet hashSet = new HashSet();
        hashSet.add(new CommandFilterBy(SearchFSCommandFacade.SearchFSFilters.OP_NEQ, searchFSCommandOp.name()));
        hashSet.add(new CommandFilterBy(SearchFSCommandFacade.SearchFSFilters.DOC_EQ, l.toString()));
        return new QueryParam(null, null, hashSet, null);
    }

    private void updateCommand(SearchFSCommand searchFSCommand, CommandStatus commandStatus) {
        searchFSCommand.setStatus(commandStatus);
        this.commandFacade.update(searchFSCommand);
        this.commandHistoryFacade.persistAndFlush(getHistoryStep(searchFSCommand));
    }

    private void removeCommand(SearchFSCommand searchFSCommand, CommandStatus commandStatus) {
        this.commandFacade.removeById(searchFSCommand.getId());
        searchFSCommand.setStatus(commandStatus);
        this.commandHistoryFacade.persistAndFlush(getHistoryStep(searchFSCommand));
    }

    private void failCommand(SearchFSCommand searchFSCommand, String str) {
        searchFSCommand.failWith(str);
        this.commandFacade.update(searchFSCommand);
        this.commandHistoryFacade.persistAndFlush(getHistoryStep(searchFSCommand));
    }

    private Long getDocId(SearchFSCommand searchFSCommand) {
        return searchFSCommand.getInodeId();
    }

    private SearchFSCommandHistory getHistoryStep(SearchFSCommand searchFSCommand) {
        return new SearchFSCommandHistory(searchFSCommand);
    }

    private Function<SearchFSCommand, Try<Boolean>> processFunction() {
        return searchFSCommand -> {
            try {
                return processCommand(searchFSCommand);
            } catch (Exception e) {
                return new Try.Failure(new CommandException(RESTCodes.CommandErrorCode.INTERNAL_SERVER_ERROR, Level.WARNING, "command failed due to unhandled error", "command failed due to unhandled error", e));
            }
        };
    }

    private Try<Boolean> processCommand(SearchFSCommand searchFSCommand) {
        try {
            if (searchFSCommand.getOp().equals(SearchFSCommandOp.DELETE_PROJECT)) {
                this.searchController.deleteProject(searchFSCommand.getProject());
                return Try.apply(() -> {
                    return true;
                });
            }
            if (searchFSCommand.getOp().equals(SearchFSCommandOp.DELETE_ARTIFACT)) {
                this.searchController.delete(searchFSCommand.getInodeId());
                return Try.apply(() -> {
                    return true;
                });
            }
            if (searchFSCommand.getProject() == null) {
                LOGGER.log(Level.FINE, "project deleted - delaying command");
                return Try.apply(() -> {
                    return false;
                });
            }
            if (searchFSCommand.getFeatureGroup() == null && searchFSCommand.getFeatureView() == null && searchFSCommand.getTrainingDataset() == null) {
                LOGGER.log(Level.FINE, "artifact deleted - delaying command");
                return Try.apply(() -> {
                    return false;
                });
            }
            switch (AnonymousClass1.$SwitchMap$io$hops$hopsworks$persistence$entity$commands$search$SearchFSCommandOp[searchFSCommand.getOp().ordinal()]) {
                case Settings.IS_ONLINE /* 1 */:
                    this.searchController.create(searchFSCommand.getInodeId(), searchFSCommand);
                    return Try.apply(() -> {
                        return true;
                    });
                case 2:
                    this.searchController.updateTags(searchFSCommand.getInodeId(), searchFSCommand);
                    return Try.apply(() -> {
                        return true;
                    });
                case Settings.INFERENCE_SCHEMAVERSION /* 3 */:
                    this.searchController.setFeaturestore(searchFSCommand.getInodeId(), searchFSCommand);
                    return Try.apply(() -> {
                        return true;
                    });
                default:
                    return new Try.Failure(new CommandException(RESTCodes.CommandErrorCode.NOT_IMPLEMENTED, Level.WARNING, "unhandled command op:" + searchFSCommand.getOp()));
            }
        } catch (OpenSearchException e) {
            return new Try.Failure(new CommandException(RESTCodes.CommandErrorCode.OPENSEARCH_ACCESS_ERROR, Level.WARNING, "command failed due to opensearch error", "command failed due to opensearch error", e));
        } catch (CommandException e2) {
            return new Try.Failure(e2);
        }
    }
}
