package io.hops.hopsworks.common.jobs.flink;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.dao.project.ProjectFacade;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.UserGroupInformationService;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.hdfs.inode.InodeController;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.yarn.YarnClientService;
import io.hops.hopsworks.common.yarn.YarnClientWrapper;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;
import io.hops.hopsworks.persistence.entity.hdfs.user.HdfsUsers;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.persistence.entity.user.activity.ActivityFlag;
import io.hops.hopsworks.restutils.RESTCodes;
import io.hops.security.UserNotFoundException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.parquet.Strings;
import org.yaml.snakeyaml.Yaml;

@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/jobs/flink/FlinkController.class */
public class FlinkController {
    private static final Logger LOGGER = Logger.getLogger(FlinkController.class.getName());

    @EJB
    private AsynchronousJobExecutor submitter;

    @EJB
    private ActivityFacade activityFacade;

    @EJB
    private UserGroupInformationService ugiService;

    @EJB
    private HdfsUsersController hdfsUsersBean;

    @EJB
    private Settings settings;

    @EJB
    private YarnClientService ycs;

    @EJB
    private ProjectFacade projectFacade;

    @EJB
    private InodeController inodeController;

    @EJB
    private KafkaBrokers kafkaBrokers;

    @EJB
    private ServiceDiscoveryController serviceDiscoveryController;

    public Execution startJob(Jobs jobs, Users users) throws GenericException, JobException, ServiceException {
        if (jobs == null) {
            throw new NullPointerException("Cannot run a null job.");
        }
        if (users == null) {
            throw new NullPointerException("Cannot run a job as a null user.");
        }
        if (jobs.getJobType() != JobType.FLINK) {
            throw new IllegalArgumentException("Job configuration is not a Flink job configuration.");
        }
        String hdfsUserName = this.hdfsUsersBean.getHdfsUserName(jobs.getProject(), users);
        FlinkJob flinkJob = null;
        try {
            String str = "https://" + this.serviceDiscoveryController.constructServiceFQDNWithPort(ServiceDiscoveryController.HopsworksService.HOPSWORKS_APP);
            try {
                flinkJob = (FlinkJob) this.ugiService.getProxyUser(hdfsUserName).doAs(() -> {
                    return new FlinkJob(jobs, this.submitter, users, this.hdfsUsersBean.getHdfsUserName(jobs.getProject(), jobs.getCreator()), this.settings, this.kafkaBrokers.getKafkaBrokersString(), str, this.serviceDiscoveryController);
                });
            } catch (InterruptedException e) {
                LOGGER.log(Level.SEVERE, (String) null, (Throwable) e);
            }
            if (flinkJob == null) {
                throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.WARNING, "Could not instantiate job with name: " + jobs.getName() + " and id: " + jobs.getId(), "sparkjob object was null");
            }
            Execution requestExecutionId = flinkJob.requestExecutionId();
            this.submitter.startExecution(flinkJob);
            this.activityFacade.persistActivity(ActivityFacade.RAN_JOB, jobs.getProject(), users.asUser(), ActivityFlag.JOB);
            return requestExecutionId;
        } catch (ServiceDiscoveryException e2) {
            throw new ServiceException(RESTCodes.ServiceErrorCode.SERVICE_NOT_FOUND, Level.SEVERE, "job: " + jobs.getId() + ", user:" + users.getUsername(), e2.getMessage(), e2);
        } catch (IOException e3) {
            throw new JobException(RESTCodes.JobErrorCode.PROXY_ERROR, Level.SEVERE, "job: " + jobs.getId() + ", user:" + users.getUsername(), e3.getMessage(), e3);
        }
    }

    @TransactionAttribute(TransactionAttributeType.NEVER)
    public String getFlinkMasterAddr(String str) {
        LOGGER.log(Level.INFO, "Getting Flink Master Addr for:" + str);
        Configuration configuration = this.settings.getConfiguration();
        org.apache.flink.configuration.Configuration loadConfiguration = GlobalConfiguration.loadConfiguration(this.settings.getFlinkConfDir());
        YarnConfiguration yarnConfiguration = new YarnConfiguration(configuration);
        YarnClientWrapper yarnClientWrapper = null;
        YarnClusterDescriptor yarnClusterDescriptor = null;
        String str2 = null;
        try {
            try {
                yarnConfiguration.addResource(new File(this.settings.getHadoopConfDir() + "/yarn-site.xml").toURI().toURL());
                yarnClientWrapper = this.ycs.getYarnClientSuper();
                yarnClusterDescriptor = new YarnClusterDescriptor(loadConfiguration, yarnConfiguration, this.settings.getFlinkConfDir(), yarnClientWrapper.getYarnClient(), true);
                ClusterClient retrieve = yarnClusterDescriptor.retrieve(ApplicationId.fromString(str));
                str2 = retrieve.getClusterConnectionInfo().getHostname() + KafkaConst.COLON_SEPARATOR + retrieve.getClusterConnectionInfo().getPort();
                if (yarnClusterDescriptor != null) {
                    yarnClusterDescriptor.close();
                }
                if (yarnClientWrapper != null) {
                    this.ycs.closeYarnClient(yarnClientWrapper);
                }
            } catch (Exception e) {
                LOGGER.log(Level.FINE, "Could not retrieve Flink Master URL for applicationID: " + str, (Throwable) e);
                if (yarnClusterDescriptor != null) {
                    yarnClusterDescriptor.close();
                }
                if (yarnClientWrapper != null) {
                    this.ycs.closeYarnClient(yarnClientWrapper);
                }
            }
            return str2;
        } catch (Throwable th) {
            if (yarnClusterDescriptor != null) {
                yarnClusterDescriptor.close();
            }
            if (yarnClientWrapper != null) {
                this.ycs.closeYarnClient(yarnClientWrapper);
            }
            throw th;
        }
    }

    @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
    public Map<String, Project> getProjectsOfFlinkJobs(String str) {
        if (Strings.isNullOrEmpty(str)) {
            throw new IllegalArgumentException("Flink historyserver.archive.fs.dir property was not provided");
        }
        HashMap hashMap = new HashMap();
        try {
            for (Inode inode : this.inodeController.getChildren(str)) {
                if (inode.getHdfsUser() != null) {
                    hashMap.put(inode.getInodePK().getName(), this.projectFacade.findByName(inode.getHdfsUser().getProject()));
                }
            }
        } catch (IOException e) {
            LOGGER.log(Level.WARNING, "Could not find Flink jobs in history server archive " + str, (Throwable) e);
        }
        return hashMap;
    }

    @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
    public Project getProjectOfFlinkJob(String str, String str2) throws UserNotFoundException {
        HdfsUsers hdfsUser;
        if (Strings.isNullOrEmpty(str)) {
            throw new IllegalArgumentException("Flink historyserver.archive.fs.dir property was not provided");
        }
        Inode inodeAtPath = this.inodeController.getInodeAtPath(str + File.separator + str2);
        if (inodeAtPath == null || (hdfsUser = inodeAtPath.getHdfsUser()) == null) {
            throw new UserNotFoundException("Flink job belongs to a deleted project or a removed project member.");
        }
        return this.projectFacade.findByName(hdfsUser.getProject());
    }

    public String getArchiveDir() throws IOException {
        Yaml yaml = new Yaml();
        FileInputStream fileInputStream = new FileInputStream(new File(this.settings.getFlinkConfFile()));
        Throwable th = null;
        try {
            try {
                String prepPath = Utils.prepPath((String) ((Map) yaml.load(fileInputStream)).get("historyserver.archive.fs.dir"));
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                return prepPath;
            } finally {
            }
        } catch (Throwable th3) {
            if (fileInputStream != null) {
                if (th != null) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th3;
        }
    }
}
