/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.spark;

import io.hops.hopsworks.common.dao.jobs.description.Jobs;
import io.hops.hopsworks.common.dao.project.Project;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.configuration.JobType;
import io.hops.hopsworks.common.jobs.spark.SparkJobConfiguration;
import io.hops.hopsworks.common.jobs.yarn.LocalResourceDTO;
import io.hops.hopsworks.common.jobs.yarn.ServiceProperties;
import io.hops.hopsworks.common.jobs.yarn.YarnRunner;
import io.hops.hopsworks.common.util.HopsUtils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.util.SparkConfigurationUtil;
import io.hops.hopsworks.common.util.templates.ConfigProperty;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.parquet.Strings;

public class SparkYarnRunnerBuilder {
    private final Jobs job;
    private final SparkJobConfiguration sparkJobConfiguration;
    private final List<String> jobArgs = new ArrayList<String>();
    private String jobName = "Untitled Spark Job";
    private final List<LocalResourceDTO> extraFiles = new ArrayList<LocalResourceDTO>();
    private final Map<String, String> sysProps = new HashMap<String, String>();
    private ServiceProperties serviceProps;
    private SparkConfigurationUtil sparkConfigurationUtil = new SparkConfigurationUtil();

    public SparkYarnRunnerBuilder(Jobs job) {
        this.job = job;
        this.sparkJobConfiguration = (SparkJobConfiguration)job.getJobConfig();
        if (this.sparkJobConfiguration.getAppPath() == null || this.sparkJobConfiguration.getAppPath().isEmpty()) {
            throw new IllegalArgumentException("Path to application executable cannot be empty!");
        }
        if (this.sparkJobConfiguration.getMainClass() == null || this.sparkJobConfiguration.getMainClass().isEmpty()) {
            throw new IllegalArgumentException("Name of the main class cannot be empty!");
        }
    }

    public YarnRunner getYarnRunner(Project project, String jobUser, String usersFullName, AsynchronousJobExecutor services, DistributedFileSystemOps dfsClient, YarnClient yarnClient, Settings settings) throws IOException {
        String archives;
        HashMap<String, ConfigProperty> jobHopsworksProps = new HashMap<String, ConfigProperty>();
        JobType jobType = this.job.getJobConfig().getJobType();
        String appPath = ((SparkJobConfiguration)this.job.getJobConfig()).getAppPath();
        YarnRunner.Builder builder = new YarnRunner.Builder("org.apache.spark.deploy.yarn.ApplicationMaster");
        builder.setJobType(jobType);
        builder.setYarnClient(yarnClient);
        builder.setDfsClient(dfsClient);
        String stagingPath = "/Projects/" + project.getName() + "/" + "Resources" + "/.sparkjobstaging-" + "**APPID";
        builder.localResourcesBasePath(stagingPath);
        builder.addFileToRemove("hdfs://" + stagingPath);
        String appExecName = null;
        if (jobType == JobType.SPARK) {
            appExecName = "__app__.jar";
        } else if (jobType == JobType.PYSPARK) {
            appExecName = appPath.substring(appPath.lastIndexOf(File.separator) + 1);
        }
        builder.addLocalResource(new LocalResourceDTO(appExecName, appPath, LocalResourceVisibility.APPLICATION.toString(), LocalResourceType.FILE.toString(), null), !appPath.startsWith("hdfs:"));
        builder.addToAppMasterEnvironment("CLASSPATH", "__app__.jar");
        HashMap<String, String> extraJavaOptions = new HashMap<String, String>();
        jobHopsworksProps.put("spark.yarn.stagingDir", new ConfigProperty("spark.yarn.stagingDir", HopsUtils.IGNORE, stagingPath));
        jobHopsworksProps.put("hopsworks.job.appid", new ConfigProperty("hopsworks.job.appid", HopsUtils.IGNORE, "**APPID"));
        extraJavaOptions.put("hopsworks.job.appid", "**APPID");
        extraJavaOptions.put("hopsworks.logstash.job.info", project.getName().toLowerCase() + "," + this.jobName + "," + this.job.getId() + "," + "**APPID");
        StringBuilder amargs = new StringBuilder("--class ");
        amargs.append(((SparkJobConfiguration)this.job.getJobConfig()).getMainClass());
        if (jobType == JobType.PYSPARK) {
            amargs.append(" --primary-py-file ").append(appExecName);
            if (!this.serviceProps.isAnacondaEnabled()) {
                throw new IOException("PySpark job needs to have Python Anaconda environment enabled");
            }
        }
        String tfLibraryPath = services.getTfLibMappingUtil().getTfLdLibraryPath(project);
        HashMap<String, String> finalJobProps = new HashMap<String, String>(this.sparkConfigurationUtil.setFrameworkProperties(project, this.job.getJobConfig(), settings, jobUser, usersFullName, tfLibraryPath, extraJavaOptions));
        finalJobProps.put("spark.yarn.appMasterEnv.SPARK_USER", jobUser);
        finalJobProps.put("spark.executorEnv.SPARK_USER", jobUser);
        finalJobProps.put("spark.yarn.appMasterEnv.SPARK_YARN_MODE", "true");
        finalJobProps.put("spark.yarn.appMasterEnv.SPARK_YARN_STAGING_DIR", stagingPath);
        Properties sparkProperties = new Properties();
        try (String[] is = new FileInputStream(settings.getSparkDir() + "/" + "conf/spark-defaults.conf");){
            sparkProperties.load((InputStream)is);
            for (String property : sparkProperties.stringPropertyNames()) {
                if (finalJobProps.containsKey(property)) continue;
                finalJobProps.put(property, sparkProperties.getProperty(property).trim());
            }
        }
        for (String jvmOption : ((String)finalJobProps.get("spark.driver.extraJavaOptions")).split(" +")) {
            builder.addJavaOption(jvmOption);
        }
        for (String key : finalJobProps.keySet()) {
            if (key.startsWith("spark.yarn.appMasterEnv.")) {
                builder.addToAppMasterEnvironment(key.replace("spark.yarn.appMasterEnv.", ""), (String)finalJobProps.get(key));
            }
            this.addSystemProperty(key, (String)finalJobProps.get(key));
        }
        builder.addToAppMasterEnvironment("CLASSPATH", (String)finalJobProps.get("spark.driver.extraClassPath"));
        for (String s : this.sysProps.keySet()) {
            String option = YarnRunner.escapeForShell("-D" + s + "=" + this.sysProps.get(s));
            builder.addJavaOption(option);
        }
        for (String s : this.jobArgs) {
            amargs.append(" --arg '").append(s).append("'");
        }
        builder.amArgs(amargs.toString());
        builder.amMemory(this.sparkJobConfiguration.getAmMemory());
        builder.amVCores(this.sparkJobConfiguration.getAmVCores());
        builder.amQueue(this.sparkJobConfiguration.getAmQueue());
        String hopsFiles = (String)finalJobProps.get("spark.yarn.dist.files");
        if (!Strings.isNullOrEmpty((String)hopsFiles)) {
            for (String filePath : hopsFiles.split(",")) {
                String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
                if (filePath.contains("#")) {
                    fileName = filePath.split("#")[1];
                    filePath = filePath.substring(0, filePath.indexOf("#"));
                }
                builder.addLocalResource(new LocalResourceDTO(fileName, filePath, LocalResourceVisibility.APPLICATION.toString(), LocalResourceType.FILE.toString(), null), false);
            }
        }
        if (!Strings.isNullOrEmpty((String)(archives = (String)finalJobProps.get("spark.yarn.dist.archives")))) {
            for (String archivePath : archives.split(",")) {
                String fileName = archivePath.substring(archivePath.lastIndexOf("/") + 1);
                if (archivePath.contains("#")) {
                    fileName = archivePath.split("#")[1];
                    archivePath = archivePath.substring(0, archivePath.indexOf("#"));
                }
                builder.addLocalResource(new LocalResourceDTO(fileName, archivePath, LocalResourceVisibility.APPLICATION.toString(), LocalResourceType.ARCHIVE.toString(), null), false);
            }
        }
        builder.appName(this.jobName);
        return builder.build(settings.getSparkDir(), JobType.SPARK, services);
    }

    public SparkYarnRunnerBuilder setJobName(String jobName) {
        this.jobName = jobName;
        return this;
    }

    public SparkYarnRunnerBuilder addAllJobArgs(String[] jobArgs) {
        this.jobArgs.addAll(Arrays.asList(jobArgs));
        return this;
    }

    public SparkYarnRunnerBuilder addExtraFiles(List<LocalResourceDTO> projectLocalResources) {
        if (projectLocalResources != null && !projectLocalResources.isEmpty()) {
            this.extraFiles.addAll(projectLocalResources);
        }
        return this;
    }

    public void setServiceProps(ServiceProperties serviceProps) {
        this.serviceProps = serviceProps;
    }

    public SparkYarnRunnerBuilder addSystemProperty(String name, String value) {
        this.sysProps.put(name, value);
        return this;
    }
}

