/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.agent;

import io.hops.hopsworks.common.dao.host.Hosts;
import io.hops.hopsworks.common.dao.host.HostsFacade;
import io.hops.hopsworks.common.util.RemoteCommand;
import io.hops.hopsworks.common.util.RemoteCommandExecutor;
import io.hops.hopsworks.common.util.RemoteCommandResult;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.Serializable;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalTime;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ejb.DependsOn;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.Timeout;
import javax.ejb.TimerConfig;
import javax.ejb.TimerService;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.enterprise.concurrent.ManagedExecutorService;

@Singleton
@TransactionAttribute(value=TransactionAttributeType.NEVER)
@DependsOn(value={"Settings"})
public class AgentLivenessMonitor {
    private static final Logger LOGGER = Logger.getLogger(AgentLivenessMonitor.class.getName());
    private final Map<String, LocalTime> agentsHeartbeat = new ConcurrentHashMap<String, LocalTime>();
    private final ArrayBlockingQueue<String> agentsToRestart = new ArrayBlockingQueue(200);
    private static final String KAGENT_COMMAND_TEMPLATE = "sudo systemctl %s kagent";
    @EJB
    private Settings settings;
    @EJB
    private RemoteCommandExecutor remoteCommandExecutor;
    @EJB
    private HostsFacade hostsFacade;
    @Resource
    private TimerService timerService;
    @Resource(lookup="concurrent/hopsExecutorService")
    private ManagedExecutorService executorService;
    private long thresholdInSeconds;
    private String kagentUser;
    private Path identityFile;
    private String restartCommand;
    private String startCommand;
    private String stopCommand;

    @PostConstruct
    public void init() {
        this.kagentUser = this.settings.getKagentUser();
        this.identityFile = Paths.get(System.getProperty("user.home"), ".ssh", "id_rsa");
        this.restartCommand = String.format(KAGENT_COMMAND_TEMPLATE, "restart");
        this.stopCommand = String.format(KAGENT_COMMAND_TEMPLATE, "stop");
        this.startCommand = String.format(KAGENT_COMMAND_TEMPLATE, "start");
        if (this.settings.isKagentLivenessMonitorEnabled()) {
            Long time = this.settings.getConfTimeValue(this.settings.getKagentLivenessThreshold());
            TimeUnit unit = this.settings.getConfTimeTimeUnit(this.settings.getKagentLivenessThreshold());
            this.thresholdInSeconds = Math.max(TimeUnit.SECONDS.convert(time, unit), 1L);
            long livenessInterval = Math.max(this.thresholdInSeconds * 1000L / 2L, 1000L);
            this.timerService.createIntervalTimer(10000L, livenessInterval, new TimerConfig((Serializable)((Object)"kagent liveness monitor"), false));
            this.executorService.submit((Runnable)new AgentRestartConsumer());
        }
    }

    public void alive(Hosts host) {
        if (!this.settings.isKagentLivenessMonitorEnabled()) {
            return;
        }
        LOGGER.log(Level.FINEST, "Agent@" + host + " is alive");
        this.agentsHeartbeat.put(host.getHostname(), this.getNow());
    }

    @Timeout
    @TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
    public void isAlive() {
        try {
            LocalTime now = this.getNow();
            for (Map.Entry<String, LocalTime> entry : this.agentsHeartbeat.entrySet()) {
                String host = entry.getKey();
                LocalTime lastHeartbeat = entry.getValue();
                Duration duration = Duration.between(lastHeartbeat, now);
                if (duration.minusSeconds(this.thresholdInSeconds).isNegative()) continue;
                LOGGER.log(Level.WARNING, "kagent in " + host + " is not alive, restarting it");
                this.agentsToRestart.offer(host, 5L, TimeUnit.SECONDS);
            }
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, "Error checking liveness of kagent", ex);
        }
    }

    public RemoteCommandResult start(Hosts host) throws ServiceException {
        if (host.isRegistered()) {
            LOGGER.log(Level.FINE, "Starting kagent@" + host);
            RemoteCommand command = this.constructRemoteCommand(host, this.startCommand);
            return this.remoteCommandExecutor.execute(command);
        }
        this.throwHostNotRegisteredException(host);
        return null;
    }

    public Future<RemoteCommandResult> startAsync(Hosts host) throws ServiceException {
        if (host.isRegistered()) {
            LOGGER.log(Level.FINE, "Submitting start kagent@" + host);
            RemoteCommand command = this.constructRemoteCommand(host, this.startCommand);
            return this.remoteCommandExecutor.submit(command);
        }
        this.throwHostNotRegisteredException(host);
        return null;
    }

    public RemoteCommandResult stop(Hosts host) throws ServiceException {
        if (host.isRegistered()) {
            LOGGER.log(Level.FINE, "Stopping kagent@" + host);
            this.agentsHeartbeat.remove(host.getHostname());
            this.agentsToRestart.remove(host.getHostname());
            RemoteCommand command = this.constructRemoteCommand(host, this.stopCommand);
            return this.remoteCommandExecutor.execute(command);
        }
        this.throwHostNotRegisteredException(host);
        return null;
    }

    public Future<RemoteCommandResult> stopAsync(Hosts host) throws ServiceException {
        if (host.isRegistered()) {
            LOGGER.log(Level.FINE, "Submitting stop kagent@" + host);
            this.agentsHeartbeat.remove(host.getHostname());
            this.agentsToRestart.remove(host.getHostname());
            RemoteCommand command = this.constructRemoteCommand(host, this.stopCommand);
            return this.remoteCommandExecutor.submit(command);
        }
        this.throwHostNotRegisteredException(host);
        return null;
    }

    public RemoteCommandResult restart(Hosts host) throws ServiceException {
        if (host.isRegistered()) {
            LOGGER.log(Level.FINE, "Restarting kagent@" + host);
            return this.restartAgentInternal(host.getHostname());
        }
        this.throwHostNotRegisteredException(host);
        return null;
    }

    public Future<RemoteCommandResult> restartAsync(Hosts host) throws ServiceException {
        if (host.isRegistered()) {
            LOGGER.log(Level.FINE, "Submitting restart kagent@" + host);
            RemoteCommand command = this.constructRemoteCommand(host, this.restartCommand);
            return this.remoteCommandExecutor.submit(command);
        }
        this.throwHostNotRegisteredException(host);
        return null;
    }

    private void throwHostNotRegisteredException(Hosts host) throws ServiceException {
        throw new ServiceException(RESTCodes.ServiceErrorCode.HOST_NOT_REGISTERED, Level.FINE, "Host is not registered", "Host " + host + " is not registered with Hopsworks");
    }

    private RemoteCommand constructRemoteCommand(Hosts host, String command) {
        return new RemoteCommand.Builder().setHost(host.getHostname()).setUser(this.kagentUser).setIdentity(this.identityFile).setCommand(command).setConnectTimeoutMS(10000).setExecutionTimeoutS(20).build();
    }

    private LocalTime getNow() {
        return LocalTime.now();
    }

    private RemoteCommandResult restartAgentInternal(String host) throws ServiceException {
        RemoteCommand command = new RemoteCommand.Builder().setHost(host).setUser(this.kagentUser).setIdentity(this.identityFile).setCommand(this.restartCommand).setConnectTimeoutMS(10000).setExecutionTimeoutS(20).build();
        RemoteCommandResult result = this.remoteCommandExecutor.execute(command);
        if (result.getExitCode() != 0) {
            LOGGER.log(Level.WARNING, "Failed to restart kagent reason: " + result.getStdout() + " Exit code: " + result.getExitCode());
        }
        return result;
    }

    private class AgentRestartConsumer
    implements Runnable {
        private AgentRestartConsumer() {
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    String agentToRestart = (String)AgentLivenessMonitor.this.agentsToRestart.take();
                    Hosts host = AgentLivenessMonitor.this.hostsFacade.findByHostname(agentToRestart);
                    if (host == null || !host.isRegistered()) {
                        AgentLivenessMonitor.this.agentsHeartbeat.remove(agentToRestart);
                        continue;
                    }
                    if (!AgentLivenessMonitor.this.agentsHeartbeat.containsKey(agentToRestart)) continue;
                    AgentLivenessMonitor.this.restartAgentInternal(agentToRestart);
                }
                catch (ServiceException ex) {
                    LOGGER.log(Level.WARNING, "Failed to restart kagent", ex);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
                catch (Exception exception) {}
            }
        }
    }
}

