/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.flink;

import io.hops.hopsworks.common.dao.project.ProjectFacade;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.UserGroupInformationService;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.hdfs.inode.InodeController;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.flink.FlinkJob;
import io.hops.hopsworks.common.jobs.yarn.YarnJobsMonitor;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.yarn.YarnClientService;
import io.hops.hopsworks.common.yarn.YarnClientWrapper;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;
import io.hops.hopsworks.persistence.entity.hdfs.user.HdfsUsers;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.persistence.entity.user.activity.ActivityFlag;
import io.hops.hopsworks.restutils.RESTCodes;
import io.hops.security.UserNotFoundException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.parquet.Strings;
import org.yaml.snakeyaml.Yaml;

@Stateless
public class FlinkController {
    private static final Logger LOGGER = Logger.getLogger(FlinkController.class.getName());
    @EJB
    YarnJobsMonitor jobsMonitor;
    @EJB
    private AsynchronousJobExecutor submitter;
    @EJB
    private ActivityFacade activityFacade;
    @EJB
    private UserGroupInformationService ugiService;
    @EJB
    private HdfsUsersController hdfsUsersBean;
    @EJB
    private Settings settings;
    @EJB
    private YarnClientService ycs;
    @EJB
    private ProjectFacade projectFacade;
    @EJB
    private InodeController inodeController;
    @EJB
    private KafkaBrokers kafkaBrokers;

    public Execution startJob(Jobs job, Users user) throws GenericException, JobException {
        if (job == null) {
            throw new NullPointerException("Cannot run a null job.");
        }
        if (user == null) {
            throw new NullPointerException("Cannot run a job as a null user.");
        }
        if (job.getJobType() != JobType.FLINK) {
            throw new IllegalArgumentException("Job configuration is not a Flink job configuration.");
        }
        String username = this.hdfsUsersBean.getHdfsUserName(job.getProject(), user);
        FlinkJob flinkjob = null;
        try {
            UserGroupInformation proxyUser = this.ugiService.getProxyUser(username);
            try {
                flinkjob = (FlinkJob)proxyUser.doAs(() -> new FlinkJob(job, this.submitter, user, this.hdfsUsersBean.getHdfsUserName(job.getProject(), job.getCreator()), this.jobsMonitor, this.settings, this.kafkaBrokers.getKafkaBrokersString()));
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, null, ex);
            }
        }
        catch (IOException ex) {
            throw new JobException(RESTCodes.JobErrorCode.PROXY_ERROR, Level.SEVERE, "job: " + job.getId() + ", user:" + user.getUsername(), ex.getMessage(), (Throwable)ex);
        }
        if (flinkjob == null) {
            throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.WARNING, "Could not instantiate job with name: " + job.getName() + " and id: " + job.getId(), "sparkjob object was null");
        }
        Execution execution = flinkjob.requestExecutionId();
        this.submitter.startExecution(flinkjob);
        this.activityFacade.persistActivity(" ran a job named ", job.getProject(), user.asUser(), ActivityFlag.JOB);
        return execution;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TransactionAttribute(value=TransactionAttributeType.NEVER)
    public String getFlinkMasterAddr(String appId) {
        String flinkMasterURL;
        YarnClientWrapper yarnClientWrapper;
        block8: {
            LOGGER.log(Level.INFO, "Getting Flink Master Addr for:" + appId);
            org.apache.hadoop.conf.Configuration conf = this.settings.getConfiguration();
            Configuration flinkConf = GlobalConfiguration.loadConfiguration((String)this.settings.getFlinkConfDir());
            YarnConfiguration yarnConf = new YarnConfiguration(conf);
            yarnClientWrapper = null;
            YarnClusterDescriptor cluster = null;
            flinkMasterURL = null;
            try {
                yarnConf.addResource(new File(this.settings.getHadoopConfDir() + "/yarn-site.xml").toURI().toURL());
                yarnClientWrapper = this.ycs.getYarnClientSuper();
                YarnClient yarnClient = yarnClientWrapper.getYarnClient();
                cluster = new YarnClusterDescriptor(flinkConf, yarnConf, this.settings.getFlinkConfDir(), yarnClient, true);
                ClusterClient clusterClient = cluster.retrieve(ApplicationId.fromString((String)appId));
                flinkMasterURL = clusterClient.getClusterConnectionInfo().getHostname() + ":" + clusterClient.getClusterConnectionInfo().getPort();
                if (cluster == null) break block8;
            }
            catch (Exception ex) {
                block9: {
                    try {
                        LOGGER.log(Level.FINE, "Could not retrieve Flink Master URL for applicationID: " + appId, ex);
                        if (cluster == null) break block9;
                    }
                    catch (Throwable throwable) {
                        if (cluster != null) {
                            cluster.close();
                        }
                        if (yarnClientWrapper != null) {
                            this.ycs.closeYarnClient(yarnClientWrapper);
                        }
                        throw throwable;
                    }
                    cluster.close();
                }
                if (yarnClientWrapper != null) {
                    this.ycs.closeYarnClient(yarnClientWrapper);
                }
            }
            cluster.close();
        }
        if (yarnClientWrapper != null) {
            this.ycs.closeYarnClient(yarnClientWrapper);
        }
        return flinkMasterURL;
    }

    @TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
    public Map<String, Project> getProjectsOfFlinkJobs(String archiveDir) {
        if (Strings.isNullOrEmpty((String)archiveDir)) {
            throw new IllegalArgumentException("Flink historyserver.archive.fs.dir property was not provided");
        }
        HashMap<String, Project> projectsJobs = new HashMap<String, Project>();
        try {
            List<Inode> jobs = this.inodeController.getChildren(archiveDir);
            for (Inode job : jobs) {
                if (job.getHdfsUser() == null) continue;
                projectsJobs.put(job.getInodePK().getName(), this.projectFacade.findByName(job.getHdfsUser().getProject()));
            }
        }
        catch (IOException e) {
            LOGGER.log(Level.WARNING, "Could not find Flink jobs in history server archive " + archiveDir, e);
        }
        return projectsJobs;
    }

    @TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
    public Project getProjectOfFlinkJob(String archivePath, String job) throws UserNotFoundException {
        HdfsUsers hdfsUser;
        if (Strings.isNullOrEmpty((String)archivePath)) {
            throw new IllegalArgumentException("Flink historyserver.archive.fs.dir property was not provided");
        }
        Inode inode = this.inodeController.getInodeAtPath(archivePath + File.separator + job);
        if (inode != null && (hdfsUser = inode.getHdfsUser()) != null) {
            return this.projectFacade.findByName(hdfsUser.getProject());
        }
        throw new UserNotFoundException("Flink job belongs to a deleted project or a removed project member.");
    }

    public String getArchiveDir() throws IOException {
        Yaml yaml = new Yaml();
        try (FileInputStream in = new FileInputStream(new File(this.settings.getFlinkConfFile()));){
            Map obj = (Map)yaml.load((InputStream)in);
            String string = Utils.prepPath((String)obj.get("historyserver.archive.fs.dir"));
            return string;
        }
    }
}

