package io.hops.hopsworks.common.jobs.flink;

import com.google.common.base.Strings;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.yarn.YarnRunner;
import io.hops.hopsworks.common.util.FlinkConfigurationUtil;
import io.hops.hopsworks.common.util.ProjectUtils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.flink.FlinkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.project.Project;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.yaml.snakeyaml.Yaml;

/* loaded from: input_file:io/hops/hopsworks/common/jobs/flink/FlinkYarnRunnerBuilder.class */
public class FlinkYarnRunnerBuilder {
    private final Jobs job;
    private final FlinkJobConfiguration flinkJobConfiguration;
    private FlinkConfigurationUtil flinkConfigurationUtil = new FlinkConfigurationUtil();
    private final Map<String, String> dynamicProperties = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlinkYarnRunnerBuilder(Jobs jobs) {
        this.job = jobs;
        this.flinkJobConfiguration = jobs.getJobConfig();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addDynamicProperty(String str, String str2) {
        this.dynamicProperties.put(str, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public YarnRunner getYarnRunner(Project project, String str, DistributedFileSystemOps distributedFileSystemOps, YarnClient yarnClient, AsynchronousJobExecutor asynchronousJobExecutor, Settings settings, String str2, String str3, ServiceDiscoveryController serviceDiscoveryController) throws IOException, ServiceDiscoveryException {
        String str4 = File.separator + Settings.DIR_ROOT + File.separator + project.getName() + File.separator + "Resources";
        Configuration configuration = asynchronousJobExecutor.getSettings().getConfiguration();
        YarnRunner.Builder builder = new YarnRunner.Builder(Settings.FLINK_AM_MAIN);
        org.apache.flink.configuration.Configuration loadConfiguration = GlobalConfiguration.loadConfiguration(settings.getFlinkConfDir());
        YarnConfiguration yarnConfiguration = new YarnConfiguration(configuration);
        try {
            yarnConfiguration.addResource(new File(settings.getHadoopConfDir() + "/" + Settings.DEFAULT_YARN_CONFFILE_NAME).toURI().toURL());
            HashMap hashMap = new HashMap();
            hashMap.put(Settings.LOGSTASH_JOB_INFO, project.getName().toLowerCase() + "," + this.job.getName() + "," + this.job.getId() + "," + YarnRunner.APPID_PLACEHOLDER);
            Map<String, String> frameworkProperties = this.flinkConfigurationUtil.setFrameworkProperties(project, this.job.getJobConfig(), settings, str, hashMap, str2, str3, serviceDiscoveryController);
            Yaml yaml = new Yaml();
            FileInputStream fileInputStream = new FileInputStream(new File(settings.getFlinkConfFile()));
            Throwable th = null;
            try {
                try {
                    Map map = (Map) yaml.load(fileInputStream);
                    for (String str5 : map.keySet()) {
                        frameworkProperties.putIfAbsent(str5, String.valueOf(map.get(str5)));
                    }
                    if (fileInputStream != null) {
                        if (0 != 0) {
                            try {
                                fileInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileInputStream.close();
                        }
                    }
                    YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(loadConfiguration, yarnConfiguration, settings.getFlinkConfDir(), yarnClient, true);
                    ClusterSpecification createClusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(this.flinkJobConfiguration.getJobManagerMemory()).setTaskManagerMemoryMB(this.flinkJobConfiguration.getTaskManagerMemory()).setSlotsPerTaskManager(this.flinkJobConfiguration.getNumberOfTaskSlots()).setNumberTaskManagers(this.flinkJobConfiguration.getNumberOfTaskManagers()).createClusterSpecification();
                    yarnClusterDescriptor.setLocalJarPath(new Path(settings.getLocalFlinkJarPath()));
                    yarnClusterDescriptor.setDocker(ProjectUtils.getFullDockerImageName(project, settings, serviceDiscoveryController, true), settings.getDockerMounts());
                    builder.setYarnClient(yarnClient);
                    builder.setDfsClient(distributedFileSystemOps);
                    builder.setFlinkCluster(yarnClusterDescriptor);
                    builder.setFlinkClusterSpecification(createClusterSpecification);
                    builder.localResourcesBasePath(str4);
                    addDynamicProperty("CONDA", settings.getCurrentCondaEnvironment());
                    addDynamicProperty(Settings.LOGSTASH_JOB_INFO, project.getName().toLowerCase() + "," + this.job.getName() + "," + this.job.getId() + "," + YarnRunner.APPID_PLACEHOLDER);
                    StringBuilder sb = new StringBuilder();
                    if (!this.dynamicProperties.isEmpty()) {
                        for (String str6 : this.dynamicProperties.keySet()) {
                            sb.append(str6).append("=").append(this.dynamicProperties.get(str6)).append("@@");
                        }
                    }
                    for (String str7 : frameworkProperties.keySet()) {
                        sb.append(str7).append("=").append(frameworkProperties.get(str7)).append("@@");
                    }
                    if (sb.length() > 0) {
                        yarnClusterDescriptor.setDynamicPropertiesEncoded(sb.substring(0, sb.lastIndexOf("@@")));
                    }
                    builder.setJobType(JobType.FLINK);
                    if (!Strings.isNullOrEmpty(this.flinkJobConfiguration.getAppName())) {
                        this.flinkJobConfiguration.setAppName("Flink session with " + this.flinkJobConfiguration.getNumberOfTaskManagers() + " TaskManagers");
                    }
                    yarnClusterDescriptor.setName(this.flinkJobConfiguration.getAppName());
                    return builder.build(settings.getFlinkDir(), JobType.FLINK, asynchronousJobExecutor);
                } finally {
                }
            } catch (Throwable th3) {
                if (fileInputStream != null) {
                    if (th != null) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                throw th3;
            }
        } catch (MalformedURLException e) {
            throw new RuntimeException("Error", e);
        }
    }
}
