/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.flink;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.yarn.YarnRunner;
import io.hops.hopsworks.common.util.FlinkConfigurationUtil;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.flink.FlinkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.project.Project;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.yaml.snakeyaml.Yaml;

public class FlinkYarnRunnerBuilder {
    private final Jobs job;
    private final FlinkJobConfiguration flinkJobConfiguration;
    private FlinkConfigurationUtil flinkConfigurationUtil = new FlinkConfigurationUtil();
    private final Map<String, String> dynamicProperties = new HashMap<String, String>();

    FlinkYarnRunnerBuilder(Jobs job) {
        this.job = job;
        this.flinkJobConfiguration = (FlinkJobConfiguration)job.getJobConfig();
    }

    void addDynamicProperty(String name, String value) {
        this.dynamicProperties.put(name, value);
    }

    YarnRunner getYarnRunner(Project project, String jobUser, DistributedFileSystemOps dfsClient, YarnClient yarnClient, AsynchronousJobExecutor services, Settings settings, String kafkaBrokersString) throws IOException {
        String stagingPath = File.separator + "Projects" + File.separator + project.getName() + File.separator + "Resources";
        org.apache.hadoop.conf.Configuration conf = services.getSettings().getConfiguration();
        YarnRunner.Builder builder = new YarnRunner.Builder("org.apache.flink.yarn.ApplicationMaster");
        Configuration flinkConf = GlobalConfiguration.loadConfiguration((String)settings.getFlinkConfDir());
        YarnConfiguration yarnConf = new YarnConfiguration(conf);
        try {
            yarnConf.addResource(new File(settings.getHadoopConfDir() + "/" + "yarn-site.xml").toURI().toURL());
        }
        catch (MalformedURLException t) {
            throw new RuntimeException("Error", t);
        }
        HashMap<String, String> extraJavaOptions = new HashMap<String, String>();
        extraJavaOptions.put("hopsworks.logstash.job.info", project.getName().toLowerCase() + "," + this.job.getName() + "," + this.job.getId() + "," + "**APPID");
        Map<String, String> finalJobProps = this.flinkConfigurationUtil.setFrameworkProperties(project, this.job.getJobConfig(), settings, jobUser, null, extraJavaOptions, kafkaBrokersString);
        Yaml yaml = new Yaml();
        try (FileInputStream in = new FileInputStream(new File(settings.getFlinkConfFile()));){
            Map flinkConfProps = (Map)yaml.load((InputStream)in);
            for (String key : flinkConfProps.keySet()) {
                finalJobProps.putIfAbsent(key, String.valueOf(flinkConfProps.get(key)));
            }
        }
        YarnClusterDescriptor cluster = new YarnClusterDescriptor(flinkConf, yarnConf, settings.getFlinkConfDir(), yarnClient, true);
        ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(this.flinkJobConfiguration.getJobManagerMemory()).setTaskManagerMemoryMB(this.flinkJobConfiguration.getTaskManagerMemory()).setSlotsPerTaskManager(this.flinkJobConfiguration.getNumberOfTaskSlots()).setNumberTaskManagers(this.flinkJobConfiguration.getNumberOfTaskManagers()).createClusterSpecification();
        cluster.setLocalJarPath(new Path(settings.getLocalFlinkJarPath()));
        cluster.addHopsLocalResources("cacerts.jks", settings.getGlassfishTrustStoreHdfs());
        builder.setYarnClient(yarnClient);
        builder.setDfsClient(dfsClient);
        builder.setFlinkCluster(cluster);
        builder.setFlinkClusterSpecification(clusterSpecification);
        builder.localResourcesBasePath(stagingPath);
        this.addDynamicProperty("CONDA", settings.getCurrentCondaEnvironment(project));
        this.addDynamicProperty("hopsworks.logstash.job.info", project.getName().toLowerCase() + "," + this.job.getName() + "," + this.job.getId() + "," + "**APPID");
        StringBuilder dynamicPropertiesEncoded = new StringBuilder();
        if (!this.dynamicProperties.isEmpty()) {
            for (String s : this.dynamicProperties.keySet()) {
                dynamicPropertiesEncoded.append(s).append("=").append(this.dynamicProperties.get(s)).append("@@");
            }
        }
        for (String key : finalJobProps.keySet()) {
            dynamicPropertiesEncoded.append(key).append("=").append(finalJobProps.get(key)).append("@@");
        }
        if (dynamicPropertiesEncoded.length() > 0) {
            cluster.setDynamicPropertiesEncoded(dynamicPropertiesEncoded.substring(0, dynamicPropertiesEncoded.lastIndexOf("@@")));
        }
        builder.setJobType(JobType.FLINK);
        if (!Strings.isNullOrEmpty((String)this.flinkJobConfiguration.getAppName())) {
            this.flinkJobConfiguration.setAppName("Flink session with " + this.flinkJobConfiguration.getNumberOfTaskManagers() + " TaskManagers");
        }
        cluster.setName(this.flinkJobConfiguration.getAppName());
        return builder.build(settings.getFlinkDir(), JobType.FLINK, services);
    }
}

