/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.execution;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.dao.hdfs.inode.Inode;
import io.hops.hopsworks.common.dao.jobhistory.Execution;
import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade;
import io.hops.hopsworks.common.dao.jobhistory.YarnApplicationAttemptStateFacade;
import io.hops.hopsworks.common.dao.jobhistory.YarnApplicationstate;
import io.hops.hopsworks.common.dao.jobhistory.YarnApplicationstateFacade;
import io.hops.hopsworks.common.dao.jobs.description.Jobs;
import io.hops.hopsworks.common.dao.jobs.description.YarnAppUrlsDTO;
import io.hops.hopsworks.common.dao.jobs.quota.YarnProjectsQuota;
import io.hops.hopsworks.common.dao.jobs.quota.YarnProjectsQuotaFacade;
import io.hops.hopsworks.common.dao.project.PaymentType;
import io.hops.hopsworks.common.dao.project.Project;
import io.hops.hopsworks.common.dao.user.Users;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.dao.user.activity.ActivityFlag;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.hdfs.inode.InodeController;
import io.hops.hopsworks.common.jobs.AppInfoDTO;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.JobLogDTO;
import io.hops.hopsworks.common.jobs.flink.FlinkController;
import io.hops.hopsworks.common.jobs.jobhistory.JobFinalStatus;
import io.hops.hopsworks.common.jobs.spark.SparkController;
import io.hops.hopsworks.common.jobs.spark.SparkJobConfiguration;
import io.hops.hopsworks.common.jobs.yarn.YarnLogUtil;
import io.hops.hopsworks.common.jobs.yarn.YarnMonitor;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.yarn.YarnClientService;
import io.hops.hopsworks.common.yarn.YarnClientWrapper;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

@Stateless
public class ExecutionController {
    @EJB
    private SparkController sparkController;
    @EJB
    private FlinkController flinkController;
    @EJB
    private InodeController inodeController;
    @EJB
    private ActivityFacade activityFacade;
    @EJB
    private HdfsUsersController hdfsUsersController;
    @EJB
    private DistributedFsService dfs;
    @EJB
    private Settings settings;
    @EJB
    private ExecutionFacade execFacade;
    @EJB
    private YarnClientService ycs;
    @EJB
    private YarnApplicationAttemptStateFacade appAttemptStateFacade;
    @EJB
    private YarnApplicationstateFacade yarnApplicationstateFacade;
    @EJB
    private YarnProjectsQuotaFacade yarnProjectsQuotaFacade;
    @EJB
    private AsynchronousJobExecutor async;
    private static final Logger LOGGER = Logger.getLogger(ExecutionController.class.getName());
    private static final String REMOTE_PROTOCOL = "hdfs://";

    @TransactionAttribute(value=TransactionAttributeType.REQUIRES_NEW)
    public Execution start(Jobs job, String args, Users user) throws JobException, GenericException, ServiceException, ProjectException {
        Execution exec;
        YarnProjectsQuota projectQuota;
        if (job.getProject().getPaymentType().equals((Object)PaymentType.PREPAID) && ((projectQuota = this.yarnProjectsQuotaFacade.findByProjectName(job.getProject().getName())) == null || projectQuota.getQuotaRemaining() <= 0.0f)) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_QUOTA_ERROR, Level.FINE);
        }
        switch (job.getJobType()) {
            case BEAM_FLINK: 
            case FLINK: {
                return this.flinkController.startJob(job, user);
            }
            case SPARK: {
                String pathOfInode;
                exec = this.sparkController.startJob(job, args, user);
                if (exec == null) {
                    throw new IllegalArgumentException("Problem getting execution object for: " + (Object)((Object)job.getJobType()));
                }
                SparkJobConfiguration config = (SparkJobConfiguration)job.getJobConfig();
                String path = config.getAppPath();
                try {
                    pathOfInode = Utils.prepPath(path);
                }
                catch (UnsupportedEncodingException ex) {
                    throw new JobException(RESTCodes.JobErrorCode.JOB_START_FAILED, Level.FINE, "Job name: " + job.getName(), ex.getMessage(), (Throwable)ex);
                }
                Inode inode = this.inodeController.getInodeAtPath(pathOfInode);
                String inodeName = inode.getInodePK().getName();
                this.activityFacade.persistActivity(" ran a job used as input file " + inodeName, job.getProject(), user, ActivityFlag.JOB);
                break;
            }
            case PYSPARK: {
                if (!job.getProject().getConda().booleanValue()) {
                    throw new ProjectException(RESTCodes.ProjectErrorCode.ANACONDA_NOT_ENABLED, Level.FINEST);
                }
                exec = this.sparkController.startJob(job, args, user);
                if (exec != null) break;
                throw new IllegalArgumentException("Error while getting execution object for: " + (Object)((Object)job.getJobType()));
            }
            default: {
                throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ACTION, Level.FINE, "Unsupported job type: " + (Object)((Object)job.getJobType()));
            }
        }
        return exec;
    }

    public Execution stop(Jobs job) throws JobException {
        switch (job.getJobType()) {
            case BEAM_FLINK: 
            case FLINK: 
            case SPARK: 
            case PYSPARK: {
                List<Execution> executions = this.execFacade.findByJobAndNotFinished(job);
                if (executions != null && !executions.isEmpty()) {
                    for (Execution execution : executions) {
                        if (execution.getAppId() == null) continue;
                        this.stopExecution(execution);
                    }
                    return this.execFacade.findById(executions.get(0).getId());
                }
                return null;
            }
        }
        throw new IllegalArgumentException("Unsupported job type: " + (Object)((Object)job.getJobType()));
    }

    public Execution stopExecution(Integer id) throws JobException {
        Execution execution = this.execFacade.findById(id);
        return this.stopExecution(execution);
    }

    public Execution stopExecution(Execution execution) throws JobException {
        if (execution.getAppId() == null) {
            throw new JobException(RESTCodes.JobErrorCode.APPLICATIONID_NOT_FOUND, Level.FINE, "execution.id: " + execution.getId());
        }
        YarnClientWrapper yarnClientWrapper = null;
        try {
            yarnClientWrapper = this.ycs.getYarnClientSuper(this.settings.getConfiguration());
            yarnClientWrapper.getYarnClient().killApplication(ApplicationId.fromString((String)execution.getAppId()));
            this.async.getYarnExecutionFinalizer().removeAllNecessary(execution);
            Execution execution2 = this.execFacade.findById(execution.getId());
            this.ycs.closeYarnClient(yarnClientWrapper);
            return execution2;
        }
        catch (IOException | YarnException ex) {
            try {
                LOGGER.log(Level.SEVERE, "Could not kill job for job:" + execution.getJob().getName() + "with appId:" + execution.getAppId(), ex);
                throw new JobException(RESTCodes.JobErrorCode.JOB_STOP_FAILED, Level.WARNING, ex.getMessage(), null, ex);
            }
            catch (Throwable throwable) {
                this.ycs.closeYarnClient(yarnClientWrapper);
                throw throwable;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobLogDTO getLog(Execution execution, JobLogDTO.LogType type) throws JobException {
        JobLogDTO dto;
        block23: {
            if (!execution.getState().isFinalState()) {
                throw new JobException(RESTCodes.JobErrorCode.JOB_EXECUTION_INVALID_STATE, Level.FINE, "Job still running.");
            }
            dto = new JobLogDTO(type);
            try (DistributedFileSystemOps dfso = null;){
                dfso = this.dfs.getDfsOps();
                String path = dto.getType() == JobLogDTO.LogType.OUT ? execution.getStdoutPath() : execution.getStderrPath();
                JobLogDTO.Retriable retriable = dto.getType() == JobLogDTO.LogType.OUT ? JobLogDTO.Retriable.RETRIEABLE_OUT : JobLogDTO.Retriable.RETRIABLE_ERR;
                boolean status = dto.getType() != JobLogDTO.LogType.OUT || execution.getFinalStatus().equals((Object)JobFinalStatus.SUCCEEDED);
                String hdfsPath = REMOTE_PROTOCOL + path;
                if (!Strings.isNullOrEmpty((String)path) && dfso.exists(hdfsPath)) {
                    String message;
                    Project project = execution.getJob().getProject();
                    String stdPath = path.split(project.getName())[1];
                    int fileIndex = stdPath.lastIndexOf(47);
                    String stdDirPath = stdPath.substring(0, fileIndex);
                    dto.setPath("Projects" + File.separator + project.getName() + stdDirPath + File.separator + "std" + dto.getType().getName().toLowerCase() + ".log");
                    if (dfso.listStatus(new Path(hdfsPath))[0].getLen() > this.settings.getJobLogsDisplaySize()) {
                        dto.setLog("Log is too big to display in browser. Click on the download button to get the log file.");
                        break block23;
                    }
                    try (FSDataInputStream input = dfso.open(hdfsPath);){
                        message = org.apache.commons.io.IOUtils.toString((InputStream)input, (String)"UTF-8");
                    }
                    dto.setLog(message.isEmpty() ? "No information." : message);
                    if (message.isEmpty() && execution.getState().isFinalState() && execution.getAppId() != null && status) {
                        dto.setRetriable(retriable);
                    }
                    break block23;
                }
                dto.setLog("No log available");
                if (execution.getState().isFinalState() && execution.getAppId() != null && status) {
                    dto.setRetriable(retriable);
                }
            }
        }
        return dto;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobLogDTO retryLogAggregation(Execution execution, JobLogDTO.LogType type) throws JobException {
        block19: {
            if (!execution.getState().isFinalState()) {
                throw new JobException(RESTCodes.JobErrorCode.JOB_EXECUTION_INVALID_STATE, Level.FINE, "Job still running.");
            }
            DistributedFileSystemOps dfso = null;
            DistributedFileSystemOps udfso = null;
            Users user = execution.getUser();
            String hdfsUser = this.hdfsUsersController.getHdfsUserName(execution.getJob().getProject(), user);
            String aggregatedLogPath = this.settings.getAggregatedLogPath(hdfsUser, execution.getAppId());
            if (aggregatedLogPath == null) {
                throw new JobException(RESTCodes.JobErrorCode.JOB_LOG, Level.INFO, "Log aggregation is not enabled");
            }
            try {
                dfso = this.dfs.getDfsOps();
                udfso = this.dfs.getDfsOps(hdfsUser);
                if (!dfso.exists(aggregatedLogPath)) {
                    throw new JobException(RESTCodes.JobErrorCode.JOB_LOG, Level.WARNING, "Logs not available. This could be caused by the retention policy.");
                }
                String hdfsLogPath = null;
                String[] desiredLogTypes = null;
                switch (type) {
                    case OUT: {
                        hdfsLogPath = REMOTE_PROTOCOL + execution.getStdoutPath();
                        desiredLogTypes = new String[]{type.name()};
                        break;
                    }
                    case ERR: {
                        hdfsLogPath = REMOTE_PROTOCOL + execution.getStderrPath();
                        desiredLogTypes = new String[]{type.name(), ".log"};
                        break;
                    }
                }
                if (Strings.isNullOrEmpty(hdfsLogPath)) break block19;
                YarnClientWrapper yarnClientWrapper = this.ycs.getYarnClientSuper(this.settings.getConfiguration());
                ApplicationId applicationId = ConverterUtils.toApplicationId((String)execution.getAppId());
                try (YarnMonitor monitor = new YarnMonitor(applicationId, yarnClientWrapper, this.ycs);){
                    YarnLogUtil.copyAggregatedYarnLogs(udfso, aggregatedLogPath, hdfsLogPath, desiredLogTypes, monitor);
                }
            }
            catch (IOException ex) {
                LOGGER.log(Level.SEVERE, null, ex);
            }
            finally {
                if (dfso != null) {
                    dfso.close();
                }
                if (udfso != null) {
                    this.dfs.closeDfsClient(udfso);
                }
            }
        }
        return this.getLog(execution, type);
    }

    public String getExecutionUI(Execution execution) throws JobException {
        String trackingUrl = this.appAttemptStateFacade.findTrackingUrlByAppId(execution.getAppId());
        if (trackingUrl != null && !trackingUrl.isEmpty()) {
            return "/project/" + execution.getJob().getProject().getId() + "/jobs/" + execution.getAppId() + "/prox/" + trackingUrl;
        }
        throw new JobException(RESTCodes.JobErrorCode.JOB_EXECUTION_TRACKING_URL_NOT_FOUND, Level.FINE, "ExecutionId:" + execution.getId());
    }

    public String getExecutionYarnUI(int execId) {
        Execution execution = this.execFacade.findById(execId);
        return "/project/" + execution.getJob().getProject().getId() + "/jobs/" + execution.getAppId() + "/prox/" + this.settings.getYarnWebUIAddress() + "/cluster/app/" + execution.getAppId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AppInfoDTO getExecutionAppInfo(Execution execution) {
        HashMap<Integer, List<String>> executorInfo;
        long startTime = System.currentTimeMillis() - 60000L;
        long endTime = System.currentTimeMillis();
        boolean running = true;
        if (execution != null) {
            startTime = execution.getSubmissionTime().getTime();
            endTime = startTime + execution.getExecutionDuration();
            running = !execution.getState().isFinalState();
        }
        int nbExecutors = 0;
        try (InfluxDB influxDB = null;){
            List values;
            influxDB = InfluxDBFactory.connect((String)this.settings.getInfluxDBAddress(), (String)this.settings.getInfluxDBUser(), (String)this.settings.getInfluxDBPW());
            String timestamp_attempt = execution.getAppId().substring(12);
            Query query = new Query("show tag values from nodemanager with key=\"source\" where source =~ /^.*" + timestamp_attempt + ".*$/", "graphite");
            QueryResult queryResult = influxDB.query(query, TimeUnit.MILLISECONDS);
            executorInfo = new HashMap<Integer, List<String>>();
            int index = 0;
            if (queryResult != null && queryResult.getResults() != null) {
                for (QueryResult.Result res : queryResult.getResults()) {
                    if (res.getSeries() == null) continue;
                    for (QueryResult.Series series : res.getSeries()) {
                        values = series.getValues();
                        if (values == null) continue;
                        nbExecutors += values.size();
                        for (List l : values) {
                            executorInfo.put(index, Stream.of(Objects.toString(l.get(1))).collect(Collectors.toList()));
                            ++index;
                        }
                    }
                }
            }
            HashMap<String, String> hostnameVCoreCache = new HashMap<String, String>();
            for (Map.Entry entry : executorInfo.entrySet()) {
                query = new Query("select MilliVcoreUsageAvgMilliVcores, hostname from nodemanager where source = '" + (String)((List)entry.getValue()).get(0) + "' limit 1", "graphite");
                queryResult = influxDB.query(query, TimeUnit.MILLISECONDS);
                if (queryResult == null || queryResult.getResults() == null || queryResult.getResults().get(0) == null || ((QueryResult.Result)queryResult.getResults().get(0)).getSeries() == null) continue;
                values = ((QueryResult.Series)((QueryResult.Result)queryResult.getResults().get(0)).getSeries().get(0)).getValues();
                String hostname = Objects.toString(((List)values.get(0)).get(2)).split("=")[1];
                ((List)entry.getValue()).add(hostname);
                if (!hostnameVCoreCache.containsKey(hostname)) {
                    query = new Query("select AllocatedVCores+AvailableVCores from nodemanager where hostname =~ /.*" + hostname + ".*/ limit 1", "graphite");
                    queryResult = influxDB.query(query, TimeUnit.MILLISECONDS);
                    if (queryResult == null || queryResult.getResults() == null || queryResult.getResults().get(0) == null || ((QueryResult.Result)queryResult.getResults().get(0)).getSeries() == null) continue;
                    values = ((QueryResult.Series)((QueryResult.Result)queryResult.getResults().get(0)).getSeries().get(0)).getValues();
                    String vCoreTemp = Objects.toString(((List)values.get(0)).get(1));
                    ((List)entry.getValue()).add(vCoreTemp);
                    hostnameVCoreCache.put(hostname, vCoreTemp);
                    continue;
                }
                ((List)entry.getValue()).add(hostnameVCoreCache.get(hostname));
            }
        }
        AppInfoDTO appInfo = new AppInfoDTO(execution.getAppId(), startTime, running, endTime, nbExecutors, executorInfo);
        return appInfo;
    }

    public void checkAccessRight(String appId, Project project) throws JobException {
        YarnApplicationstate appState = this.yarnApplicationstateFacade.findByAppId(appId);
        if (appState == null) {
            throw new JobException(RESTCodes.JobErrorCode.APPID_NOT_FOUND, Level.FINE);
        }
        if (!this.hdfsUsersController.getProjectName(appState.getAppuser()).equals(project.getName())) {
            throw new JobException(RESTCodes.JobErrorCode.JOB_ACCESS_ERROR, Level.FINE);
        }
    }

    public List<YarnAppUrlsDTO> getTensorBoardUrls(Users user, Execution execution, Jobs job) throws JobException {
        return this.getTensorBoardUrls(user, execution.getAppId(), job.getProject());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<YarnAppUrlsDTO> getTensorBoardUrls(Users user, String appId, Project project) throws JobException {
        ArrayList<YarnAppUrlsDTO> urls = new ArrayList<YarnAppUrlsDTO>();
        DistributedFileSystemOps udfso = null;
        try {
            FileStatus[] statuses;
            String hdfsUser = this.hdfsUsersController.getHdfsUserName(project, user);
            udfso = this.dfs.getDfsOps(hdfsUser);
            for (FileStatus status : statuses = udfso.getFilesystem().globStatus(new Path("/Projects/" + project.getName() + "/Experiments/" + appId + "/TensorBoard.*"))) {
                LOGGER.log(Level.FINE, "Reading tensorboard for: {0}", status.getPath());
                FSDataInputStream in = null;
                try {
                    in = udfso.open(new Path(status.getPath().toString()));
                    String url = org.apache.commons.io.IOUtils.toString((InputStream)in, (String)"UTF-8");
                    int prefix = url.indexOf("http://");
                    if (prefix != -1) {
                        url = url.substring("http://".length());
                    }
                    String name = status.getPath().getName();
                    urls.add(new YarnAppUrlsDTO(name, url));
                }
                catch (Exception e) {
                    try {
                        LOGGER.log(Level.WARNING, "Problem reading file with tensorboard address from HDFS: " + e.getMessage());
                    }
                    catch (Throwable throwable) {
                        IOUtils.closeStream(in);
                        throw throwable;
                    }
                    IOUtils.closeStream((Closeable)in);
                    continue;
                }
                IOUtils.closeStream((Closeable)in);
            }
            if (udfso != null) {
                this.dfs.closeDfsClient(udfso);
            }
        }
        catch (Exception e) {
            try {
                throw new JobException(RESTCodes.JobErrorCode.TENSORBOARD_ERROR, Level.SEVERE, null, e.getMessage(), (Throwable)e);
            }
            catch (Throwable throwable) {
                if (udfso != null) {
                    this.dfs.closeDfsClient(udfso);
                }
                throw throwable;
            }
        }
        return urls;
    }
}

