/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.flink;

import com.google.common.base.Strings;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.yarn.YarnRunner;
import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.common.util.FlinkConfigurationUtil;
import io.hops.hopsworks.common.util.ProjectUtils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.flink.FlinkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

public class FlinkYarnRunnerBuilder {
    private final Jobs job;
    private final FlinkJobConfiguration flinkJobConfiguration;
    private FlinkConfigurationUtil flinkConfigurationUtil = new FlinkConfigurationUtil();
    private final Map<String, String> dynamicProperties = new HashMap<String, String>();

    FlinkYarnRunnerBuilder(Jobs job) {
        this.job = job;
        this.flinkJobConfiguration = (FlinkJobConfiguration)job.getJobConfig();
    }

    void addDynamicProperty(String name, String value) {
        this.dynamicProperties.put(name, value);
    }

    YarnRunner getYarnRunner(Project project, String jobUser, Users hopsworksUser, DistributedFileSystemOps dfsClient, YarnClient yarnClient, AsynchronousJobExecutor services, Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint, ServingConfig servingConfig, ServiceDiscoveryController serviceDiscoveryController) throws IOException, ServiceDiscoveryException {
        String stagingPath = File.separator + "Projects" + File.separator + project.getName() + File.separator + "Resources" + "/.flinkStaging/" + hopsworksUser.getUsername();
        org.apache.hadoop.conf.Configuration conf = services.getSettings().getConfiguration();
        YarnRunner.Builder builder = new YarnRunner.Builder("org.apache.flink.yarn.ApplicationMaster");
        Configuration flinkConf = GlobalConfiguration.loadConfiguration((String)settings.getFlinkConfDir());
        YarnConfiguration yarnConf = new YarnConfiguration(conf);
        try {
            yarnConf.addResource(new File(settings.getHadoopConfDir() + "/" + "yarn-site.xml").toURI().toURL());
        }
        catch (MalformedURLException t) {
            throw new RuntimeException("Error", t);
        }
        HashMap<String, String> extraJavaOptions = new HashMap<String, String>();
        extraJavaOptions.put("hopsworks.logstash.job.info", project.getName().toLowerCase() + "," + this.job.getName() + "," + this.job.getId() + "," + "**APPID");
        Map<String, String> finalJobProps = this.flinkConfigurationUtil.setFrameworkProperties(project, this.job.getJobConfig(), settings, jobUser, hopsworksUser, extraJavaOptions, kafkaBrokersString, hopsworksRestEndpoint, servingConfig, serviceDiscoveryController);
        this.addDynamicProperty("hopsworks.logstash.job.info", project.getName().toLowerCase() + "," + this.job.getName() + "," + this.job.getId() + "," + "**APPID");
        for (String key : finalJobProps.keySet()) {
            flinkConf.setString(key, finalJobProps.get(key));
        }
        if (!this.dynamicProperties.isEmpty()) {
            for (String s : this.dynamicProperties.keySet()) {
                flinkConf.setString(s, this.dynamicProperties.get(s));
            }
        }
        if (Strings.isNullOrEmpty((String)this.flinkJobConfiguration.getAppName())) {
            this.flinkJobConfiguration.setAppName("Flink session with " + this.flinkJobConfiguration.getNumberOfTaskSlots() + " NumberOfTaskSlots");
        }
        flinkConf.setString("containerized.master.env.HOPSWORKS_JOB_NAME", this.flinkJobConfiguration.getAppName());
        YarnClusterDescriptor cluster = new YarnClusterDescriptor(flinkConf, yarnConf, yarnClient, (YarnClusterInformationRetriever)YarnClientYarnClusterInformationRetriever.create((YarnClient)yarnClient), true);
        ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(this.flinkJobConfiguration.getJobManagerMemory()).setTaskManagerMemoryMB(this.flinkJobConfiguration.getTaskManagerMemory()).setSlotsPerTaskManager(this.flinkJobConfiguration.getNumberOfTaskSlots()).createClusterSpecification();
        ArrayList<File> shipFiles = new ArrayList<File>();
        shipFiles.add(new File(settings.getFlinkDir(), "lib"));
        cluster.addShipFiles(shipFiles);
        cluster.setLocalJarPath(new Path(settings.getLocalFlinkJarPath()));
        cluster.setDocker(ProjectUtils.getFullDockerImageName(project, settings, serviceDiscoveryController, true), settings.getDockerMounts());
        builder.setYarnClient(yarnClient);
        builder.setDfsClient(dfsClient);
        builder.setFlinkCluster(cluster);
        builder.setFlinkClusterSpecification(clusterSpecification);
        builder.localResourcesBasePath(stagingPath);
        builder.setJobType(JobType.FLINK);
        return builder.build(settings.getFlinkDir(), JobType.FLINK, services);
    }
}

