/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.dao.jobs.description.JobFacade;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.jobs.JobScheduler;
import io.hops.hopsworks.common.jobs.execution.ExecutionController;
import io.hops.hopsworks.common.jobs.spark.SparkController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.util.SparkConfigurationUtil;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.ScheduleDTO;
import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.persistence.entity.user.activity.ActivityFlag;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import javax.xml.bind.JAXBException;
import org.apache.hadoop.fs.Path;
import org.eclipse.persistence.exceptions.DatabaseException;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class JobController {
    @EJB
    private JobFacade jobFacade;
    @EJB
    private JobScheduler scheduler;
    @EJB
    private ActivityFacade activityFacade;
    @EJB
    private HdfsUsersController hdfsUsersBean;
    @EJB
    private DistributedFsService dfs;
    @EJB
    private SparkController sparkController;
    @Inject
    private ExecutionController executionController;
    @EJB
    private HdfsUsersController hdfsUsersController;
    @EJB
    private Settings settings;
    private static final Logger LOGGER = Logger.getLogger(JobController.class.getName());

    public Jobs putJob(Users user, Project project, Jobs job, JobConfiguration config) throws JobException {
        try {
            if (config.getJobType() == JobType.SPARK || config.getJobType() == JobType.PYSPARK) {
                SparkConfigurationUtil sparkConfigurationUtil = new SparkConfigurationUtil();
                SparkJobConfiguration sparkJobConfiguration = (SparkJobConfiguration)config;
                sparkConfigurationUtil.validateExecutorMemory(sparkJobConfiguration.getExecutorMemory(), this.settings);
            }
            job = this.jobFacade.put(user, project, config, job);
        }
        catch (IllegalStateException ise) {
            if (ise.getCause() instanceof JAXBException) {
                throw new JobException(RESTCodes.JobErrorCode.JOB_CONFIGURATION_CONVERT_TO_JSON_ERROR, Level.FINE, "Unable to create json from JobConfiguration", ise.getMessage(), (Throwable)ise);
            }
            throw ise;
        }
        if (config.getSchedule() != null) {
            this.scheduler.scheduleJobPeriodic(job);
        }
        this.activityFacade.persistActivity(" created a new job named " + this.getJobNameForActivity(job.getName()), project, user, ActivityFlag.JOB);
        return job;
    }

    @TransactionAttribute(value=TransactionAttributeType.NEVER)
    public void updateSchedule(Project project, Jobs job, ScheduleDTO schedule, Users user) throws JobException {
        boolean isScheduleUpdated = this.jobFacade.updateJobSchedule(job.getId(), schedule);
        if (!isScheduleUpdated) {
            throw new JobException(RESTCodes.JobErrorCode.JOB_SCHEDULE_UPDATE, Level.WARNING, "Schedule is not updated in the database for jobid: " + job.getId());
        }
        job.getJobConfig().setSchedule(schedule);
        this.scheduler.scheduleJobPeriodic(job);
        this.activityFacade.persistActivity(" scheduled a job named " + this.getJobNameForActivity(job.getName()), project, user, ActivityFlag.JOB);
    }

    @TransactionAttribute(value=TransactionAttributeType.NEVER)
    public boolean unscheduleJob(Jobs job) {
        if (job.getJobConfig().getSchedule() != null) {
            boolean status = this.scheduler.unscheduleJob(job);
            job.getJobConfig().setSchedule(null);
            this.jobFacade.updateJobSchedule(job.getId(), null);
            if (!status) {
                LOGGER.log(Level.WARNING, "Schedule does not exist in the scheduler for jobid {0}", job.getId());
            }
        }
        return this.scheduler.unscheduleJob(job);
    }

    @TransactionAttribute(value=TransactionAttributeType.NEVER)
    public void deleteJob(Jobs job, Users user) throws JobException {
        this.executionController.stop(job);
        try {
            LOGGER.log(Level.FINE, "Request to delete job name ={0} job id ={1}", new Object[]{job.getName(), job.getId()});
            this.jobFacade.removeJob(job);
            LOGGER.log(Level.FINE, "Deleted job name ={0} job id ={1}", new Object[]{job.getName(), job.getId()});
            this.activityFacade.persistActivity(" deleted a job named " + job.getName(), job.getProject(), user.getEmail(), ActivityFlag.JOB);
        }
        catch (DatabaseException ex) {
            LOGGER.log(Level.SEVERE, "Job cannot be deleted job name ={0} job id ={1}", new Object[]{job.getName(), job.getId()});
            throw new JobException(RESTCodes.JobErrorCode.JOB_DELETION_ERROR, Level.SEVERE, ex.getMessage(), null, (Throwable)ex);
        }
    }

    public Jobs getJob(Project project, String name) throws JobException {
        if (Strings.isNullOrEmpty((String)name)) {
            throw new IllegalArgumentException("job name was not provided or it was not set.");
        }
        Jobs job = this.jobFacade.findByProjectAndName(project, name);
        if (job == null) {
            throw new JobException(RESTCodes.JobErrorCode.JOB_NOT_FOUND, Level.FINEST, "jobId:" + name);
        }
        return job;
    }

    @TransactionAttribute(value=TransactionAttributeType.NEVER)
    public JobConfiguration inspectProgram(String path, Project project, Users user, JobType jobType) throws JobException {
        DistributedFileSystemOps udfso = null;
        try {
            String username = this.hdfsUsersBean.getHdfsUserName(project, user);
            udfso = this.dfs.getDfsOps(username);
            LOGGER.log(Level.FINE, "Inspecting executable job program by {0} at path: {1}", new Object[]{username, path});
            switch (jobType) {
                case SPARK: 
                case PYSPARK: {
                    if (Strings.isNullOrEmpty((String)path) || !path.endsWith(".jar") && !path.endsWith(".py") && !path.endsWith(".ipynb")) {
                        throw new IllegalArgumentException("Path does not point to a .jar, .py or .ipynb file.");
                    }
                    SparkJobConfiguration sparkJobConfiguration = this.sparkController.inspectProgram(path, udfso);
                    if (udfso != null) {
                        this.dfs.closeDfsClient(udfso);
                    }
                    return sparkJobConfiguration;
                }
            }
        }
        catch (Throwable throwable) {
            if (udfso != null) {
                this.dfs.closeDfsClient(udfso);
            }
            throw throwable;
        }
        throw new IllegalArgumentException("Job type not supported: " + jobType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void versionProgram(SparkJobConfiguration job, Project project, Users user, Path path) throws JobException {
        DistributedFileSystemOps udfso = null;
        try {
            String username = this.hdfsUsersController.getHdfsUserName(project, user);
            udfso = this.dfs.getDfsOps(username);
            this.versionProgram(job, udfso, path);
            if (udfso != null) {
                this.dfs.closeDfsClient(udfso);
            }
        }
        catch (Throwable throwable) {
            if (udfso != null) {
                this.dfs.closeDfsClient(udfso);
            }
            throw throwable;
        }
    }

    public void versionProgram(SparkJobConfiguration job, DistributedFileSystemOps udfso, Path path) throws JobException {
        try {
            udfso.copyInHdfs(new Path(job.getAppPath()), path);
        }
        catch (IOException ioe) {
            throw new JobException(RESTCodes.JobErrorCode.JOB_PROGRAM_VERSIONING_FAILED, Level.FINEST, "path: " + job.getAppPath(), "versioning failed", (Throwable)ioe);
        }
    }

    private String getJobNameForActivity(String jobName) {
        String activityJobMsg = jobName;
        if (jobName.length() > 60) {
            activityJobMsg = jobName.substring(0, 60) + "...";
        }
        return activityJobMsg;
    }
}

