package io.hops.hopsworks.common.agent;

import io.hops.hopsworks.common.dao.host.HostsFacade;
import io.hops.hopsworks.common.featurestore.FeaturestoreConstants;
import io.hops.hopsworks.common.util.RemoteCommand;
import io.hops.hopsworks.common.util.RemoteCommandExecutor;
import io.hops.hopsworks.common.util.RemoteCommandResult;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.host.Hosts;
import io.hops.hopsworks.restutils.RESTCodes;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalTime;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ejb.DependsOn;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.Timeout;
import javax.ejb.TimerConfig;
import javax.ejb.TimerService;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.enterprise.concurrent.ManagedExecutorService;

@Singleton
@DependsOn({"Settings"})
@TransactionAttribute(TransactionAttributeType.NEVER)
/* loaded from: input_file:io/hops/hopsworks/common/agent/AgentLivenessMonitor.class */
public class AgentLivenessMonitor {
    private static final Logger LOGGER = Logger.getLogger(AgentLivenessMonitor.class.getName());
    private final Map<String, LocalTime> agentsHeartbeat = new ConcurrentHashMap();
    private final ArrayBlockingQueue<String> agentsToRestart = new ArrayBlockingQueue<>(FeaturestoreConstants.MAX_CHARACTERS_IN_GE_CLOUD_ID);
    private static final String KAGENT_COMMAND_TEMPLATE = "sudo systemctl %s kagent";

    @EJB
    private Settings settings;

    @EJB
    private RemoteCommandExecutor remoteCommandExecutor;

    @EJB
    private HostsFacade hostsFacade;

    @Resource
    private TimerService timerService;

    @Resource(lookup = "concurrent/hopsExecutorService")
    private ManagedExecutorService executorService;
    private long thresholdInSeconds;
    private String kagentUser;
    private Path identityFile;
    private String restartCommand;
    private String startCommand;
    private String stopCommand;

    /* loaded from: input_file:io/hops/hopsworks/common/agent/AgentLivenessMonitor$AgentRestartConsumer.class */
    private class AgentRestartConsumer implements Runnable {
        private AgentRestartConsumer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    String str = (String) AgentLivenessMonitor.this.agentsToRestart.take();
                    Optional<Hosts> findByHostname = AgentLivenessMonitor.this.hostsFacade.findByHostname(str);
                    if (!findByHostname.isPresent() || !findByHostname.get().getRegistered().booleanValue()) {
                        AgentLivenessMonitor.this.agentsHeartbeat.remove(str);
                    } else if (AgentLivenessMonitor.this.agentsHeartbeat.containsKey(str)) {
                        AgentLivenessMonitor.this.restartAgentInternal(str);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (ServiceException e2) {
                    AgentLivenessMonitor.LOGGER.log(Level.WARNING, "Failed to restart kagent", e2);
                } catch (Exception e3) {
                }
            }
        }
    }

    @PostConstruct
    public void init() {
        this.kagentUser = this.settings.getKagentUser();
        this.identityFile = Paths.get(System.getProperty("user.home"), ".ssh", "id_rsa");
        this.restartCommand = String.format(KAGENT_COMMAND_TEMPLATE, "restart");
        this.stopCommand = String.format(KAGENT_COMMAND_TEMPLATE, "stop");
        this.startCommand = String.format(KAGENT_COMMAND_TEMPLATE, "start");
        if (this.settings.isKagentLivenessMonitorEnabled()) {
            this.thresholdInSeconds = Math.max(TimeUnit.SECONDS.convert(this.settings.getConfTimeValue(this.settings.getKagentLivenessThreshold()).longValue(), this.settings.getConfTimeTimeUnit(this.settings.getKagentLivenessThreshold())), 1L);
            this.timerService.createIntervalTimer(10000L, Math.max((this.thresholdInSeconds * 1000) / 2, 1000L), new TimerConfig("kagent liveness monitor", false));
            this.executorService.submit(new AgentRestartConsumer());
        }
    }

    public void alive(Hosts hosts) {
        if (this.settings.isKagentLivenessMonitorEnabled()) {
            LOGGER.log(Level.FINEST, "Agent@" + hosts + " is alive");
            this.agentsHeartbeat.put(hosts.getHostname(), getNow());
        }
    }

    @Timeout
    @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
    public void isAlive() {
        try {
            LocalTime now = getNow();
            for (Map.Entry<String, LocalTime> entry : this.agentsHeartbeat.entrySet()) {
                String key = entry.getKey();
                if (!Duration.between(entry.getValue(), now).minusSeconds(this.thresholdInSeconds).isNegative()) {
                    LOGGER.log(Level.WARNING, "kagent in " + key + " is not alive, restarting it");
                    this.agentsToRestart.offer(key, 5L, TimeUnit.SECONDS);
                }
            }
        } catch (Exception e) {
            LOGGER.log(Level.SEVERE, "Error checking liveness of kagent", (Throwable) e);
        }
    }

    public RemoteCommandResult start(Hosts hosts) throws ServiceException {
        if (!hosts.getRegistered().booleanValue()) {
            throwHostNotRegisteredException(hosts);
            return null;
        }
        LOGGER.log(Level.FINE, "Starting kagent@" + hosts);
        return this.remoteCommandExecutor.execute(constructRemoteCommand(hosts, this.startCommand));
    }

    public Future<RemoteCommandResult> startAsync(Hosts hosts) throws ServiceException {
        if (!hosts.getRegistered().booleanValue()) {
            throwHostNotRegisteredException(hosts);
            return null;
        }
        LOGGER.log(Level.FINE, "Submitting start kagent@" + hosts);
        return this.remoteCommandExecutor.submit(constructRemoteCommand(hosts, this.startCommand));
    }

    public RemoteCommandResult stop(Hosts hosts) throws ServiceException {
        if (!hosts.getRegistered().booleanValue()) {
            throwHostNotRegisteredException(hosts);
            return null;
        }
        LOGGER.log(Level.FINE, "Stopping kagent@" + hosts);
        this.agentsHeartbeat.remove(hosts.getHostname());
        this.agentsToRestart.remove(hosts.getHostname());
        return this.remoteCommandExecutor.execute(constructRemoteCommand(hosts, this.stopCommand));
    }

    public Future<RemoteCommandResult> stopAsync(Hosts hosts) throws ServiceException {
        if (!hosts.getRegistered().booleanValue()) {
            throwHostNotRegisteredException(hosts);
            return null;
        }
        LOGGER.log(Level.FINE, "Submitting stop kagent@" + hosts);
        this.agentsHeartbeat.remove(hosts.getHostname());
        this.agentsToRestart.remove(hosts.getHostname());
        return this.remoteCommandExecutor.submit(constructRemoteCommand(hosts, this.stopCommand));
    }

    public RemoteCommandResult restart(Hosts hosts) throws ServiceException {
        if (hosts.getRegistered().booleanValue()) {
            LOGGER.log(Level.FINE, "Restarting kagent@" + hosts);
            return restartAgentInternal(hosts.getHostname());
        }
        throwHostNotRegisteredException(hosts);
        return null;
    }

    public Future<RemoteCommandResult> restartAsync(Hosts hosts) throws ServiceException {
        if (!hosts.getRegistered().booleanValue()) {
            throwHostNotRegisteredException(hosts);
            return null;
        }
        LOGGER.log(Level.FINE, "Submitting restart kagent@" + hosts);
        return this.remoteCommandExecutor.submit(constructRemoteCommand(hosts, this.restartCommand));
    }

    private void throwHostNotRegisteredException(Hosts hosts) throws ServiceException {
        throw new ServiceException(RESTCodes.ServiceErrorCode.HOST_NOT_REGISTERED, Level.FINE, "Host is not registered", "Host " + hosts + " is not registered with Hopsworks");
    }

    private RemoteCommand constructRemoteCommand(Hosts hosts, String str) {
        return new RemoteCommand.Builder().setHost(hosts.getHostname()).setUser(this.kagentUser).setIdentity(this.identityFile).setCommand(str).setConnectTimeoutMS(10000).setExecutionTimeoutS(20).build();
    }

    private LocalTime getNow() {
        return LocalTime.now();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RemoteCommandResult restartAgentInternal(String str) throws ServiceException {
        RemoteCommandResult execute = this.remoteCommandExecutor.execute(new RemoteCommand.Builder().setHost(str).setUser(this.kagentUser).setIdentity(this.identityFile).setCommand(this.restartCommand).setConnectTimeoutMS(10000).setExecutionTimeoutS(20).build());
        if (execute.getExitCode() != 0) {
            LOGGER.log(Level.WARNING, "Failed to restart kagent reason: " + execute.getStdout() + " Exit code: " + execute.getExitCode());
        }
        return execute;
    }
}
