/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.flink;

import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.configuration.JobType;
import io.hops.hopsworks.common.jobs.flink.YarnClusterDescriptor;
import io.hops.hopsworks.common.jobs.yarn.LocalResourceDTO;
import io.hops.hopsworks.common.jobs.yarn.ServiceProperties;
import io.hops.hopsworks.common.jobs.yarn.YarnRunner;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.util.ConverterUtils;

public class FlinkYarnRunnerBuilder {
    private static final Logger LOGGER = Logger.getLogger(FlinkYarnRunnerBuilder.class.getName());
    public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
    public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
    public static final String ENV_APP_ID = "_APP_ID";
    public static final String FLINK_JAR_PATH = "_FLINK_JAR_PATH";
    public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
    public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
    public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
    public static final String ENV_SLOTS = "_SLOTS";
    public static final String ENV_DETACHED = "_DETACHED";
    public static final String ENV_STREAMING_MODE = "_STREAMING_MODE";
    public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
    public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
    public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
    private static final int MIN_JM_MEMORY = 768;
    private static final int MIN_TM_MEMORY = 768;
    private String appJarPath;
    private final List<String> jobArgs = new ArrayList<String>();
    private List<LocalResourceDTO> extraFiles = new ArrayList<LocalResourceDTO>();
    private int taskManagerSlots = 1;
    private int jobManagerMemoryMb = 768;
    private String jobManagerQueue = "default";
    private int taskManagerMemoryMb = 1024;
    private int taskManagerCount = 1;
    private String configurationDirectory;
    private Path flinkConfigurationPath;
    private Path flinkLoggingConfigurationPath;
    private Path flinkJarPath;
    private StringBuilder dynamicPropertiesEncoded;
    private boolean detached;
    private boolean streamingMode = true;
    private int parallelism;
    private String customName = null;
    private final Map<String, String> sysProps = new HashMap<String, String>();
    private ServiceProperties serviceProps;

    public FlinkYarnRunnerBuilder(String appJarPath, String mainClass) {
        if (appJarPath == null || appJarPath.isEmpty()) {
            throw new IllegalArgumentException("Path to application jar cannot be empty!");
        }
        if (mainClass == null || mainClass.isEmpty()) {
            throw new IllegalArgumentException("Name of the main class cannot be empty!");
        }
        this.appJarPath = appJarPath;
    }

    public FlinkYarnRunnerBuilder addAllJobArgs(String[] jobArgs) {
        this.jobArgs.addAll(Arrays.asList(jobArgs));
        return this;
    }

    public void setAppJarPath(String appJarPath) {
        this.appJarPath = appJarPath;
    }

    public void setJobManagerMemory(int memoryMb) {
        if (memoryMb < 768) {
            throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount of " + 768 + " MB");
        }
        this.jobManagerMemoryMb = memoryMb;
    }

    public void setJobManagerQueue(String queue) {
        this.jobManagerQueue = queue;
    }

    public void setTaskManagerMemory(int memoryMb) {
        if (memoryMb < 768) {
            throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount of " + 768 + " MB");
        }
        this.taskManagerMemoryMb = memoryMb;
    }

    public void setTaskManagerSlots(int slots) {
        if (slots <= 0) {
            throw new IllegalArgumentException("Number of TaskManager slots must be positive");
        }
        this.taskManagerSlots = slots;
    }

    public int getTaskManagerSlots() {
        return this.taskManagerSlots;
    }

    public void setQueue(String queue) {
        this.jobManagerQueue = queue;
    }

    public void setConfigurationFilePath(Path confPath) {
        this.flinkConfigurationPath = confPath;
    }

    public void setConfigurationDirectory(String configurationDirectory) {
        this.configurationDirectory = configurationDirectory;
    }

    public void setFlinkLoggingConfigurationPath(Path logConfPath) {
        this.flinkLoggingConfigurationPath = logConfPath;
    }

    public Path getFlinkLoggingConfigurationPath() {
        return this.flinkLoggingConfigurationPath;
    }

    public void setTaskManagerCount(int tmCount) {
        if (tmCount < 1) {
            throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
        }
        this.taskManagerCount = tmCount;
    }

    public int getTaskManagerCount() {
        return this.taskManagerCount;
    }

    public boolean isStreamingMode() {
        return this.streamingMode;
    }

    public void setStreamingMode(boolean streamingMode) {
        this.streamingMode = streamingMode;
    }

    public void setDynamicPropertiesEncoded(StringBuilder dynamicPropertiesEncoded) {
        this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
    }

    public StringBuilder getDynamicPropertiesEncoded() {
        return this.dynamicPropertiesEncoded;
    }

    public void setName(String name) {
        if (name == null) {
            throw new IllegalArgumentException("The passed name is null");
        }
        this.customName = name;
    }

    public FlinkYarnRunnerBuilder setExtraFiles(List<LocalResourceDTO> extraFiles) {
        if (extraFiles == null) {
            throw new IllegalArgumentException("Map of extra files cannot be null.");
        }
        this.extraFiles = extraFiles;
        return this;
    }

    public FlinkYarnRunnerBuilder addExtraFile(LocalResourceDTO dto) {
        if (dto.getName() == null || dto.getName().isEmpty()) {
            throw new IllegalArgumentException("Filename in extra file mapping cannot be null or empty.");
        }
        if (dto.getPath() == null || dto.getPath().isEmpty()) {
            throw new IllegalArgumentException("Location in extra file mapping cannot be null or empty.");
        }
        this.extraFiles.add(dto);
        return this;
    }

    public FlinkYarnRunnerBuilder addExtraFiles(List<LocalResourceDTO> projectLocalResources) {
        if (projectLocalResources != null && !projectLocalResources.isEmpty()) {
            this.extraFiles.addAll(projectLocalResources);
        }
        return this;
    }

    public FlinkYarnRunnerBuilder addSystemProperty(String name, String value) {
        this.sysProps.put(name, value);
        return this;
    }

    public void setServiceProps(ServiceProperties serviceProps) {
        this.serviceProps = serviceProps;
    }

    public void isReadyForDeployment() throws YarnDeploymentException {
        if (this.taskManagerCount <= 0) {
            throw new YarnDeploymentException("Taskmanager count must be positive");
        }
        if (this.flinkJarPath == null) {
            throw new YarnDeploymentException("The Flink jar path is null");
        }
        if (this.configurationDirectory == null) {
            throw new YarnDeploymentException("Configuration directory not set");
        }
        if (this.flinkConfigurationPath == null) {
            throw new YarnDeploymentException("Configuration path not set");
        }
        if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
            LOGGER.log(Level.WARNING, "Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.");
        }
    }

    public static boolean allocateResource(int[] nodeManagers, int toAllocate) {
        for (int i = 0; i < nodeManagers.length; ++i) {
            if (nodeManagers[i] < toAllocate) continue;
            int n = i;
            nodeManagers[n] = nodeManagers[n] - toAllocate;
            return true;
        }
        return false;
    }

    public void setDetachedMode(boolean detachedMode) {
        this.detached = detachedMode;
    }

    public boolean isDetached() {
        return this.detached;
    }

    public int getParallelism() {
        return this.parallelism;
    }

    public void setParallelism(int parallelism) {
        this.parallelism = parallelism;
    }

    protected YarnRunner getYarnRunner(String project, String flinkUser, String jobUser, String hadoopDir, String flinkDir, String flinkConfDir, String flinkConfFile, DistributedFileSystemOps dfsClient, YarnClient yarnClient, String certsDir, AsynchronousJobExecutor services) throws IOException {
        String name;
        YarnRunner.Builder builder = new YarnRunner.Builder("org.apache.flink.yarn.ApplicationMaster");
        YarnClusterDescriptor cluster = new YarnClusterDescriptor();
        cluster.setConfigurationDirectory(flinkConfDir);
        cluster.setConfigurationFilePath(new Path(flinkConfFile));
        cluster.setDetachedMode(this.detached);
        Configuration flinkConf = new Configuration();
        cluster.setFlinkConfiguration(flinkConf);
        cluster.setJobManagerMemory(this.jobManagerMemoryMb);
        cluster.setTaskManagerCount(this.taskManagerCount);
        cluster.setTaskManagerMemory(this.taskManagerMemoryMb);
        cluster.setTaskManagerSlots(this.taskManagerSlots);
        cluster.setQueue(this.jobManagerQueue);
        cluster.setLocalJarPath(new Path("file://" + flinkDir + "/flink.jar"));
        builder.setYarnClient(yarnClient);
        builder.setDfsClient(dfsClient);
        builder.setFlinkCluster(cluster);
        String stagingPath = File.separator + "Projects" + File.separator + project + File.separator + "Resources";
        builder.localResourcesBasePath(stagingPath);
        if (!this.extraFiles.isEmpty()) {
            if (null == dfsClient) {
                throw new YarnDeploymentException("Could not connect to filesystem");
            }
            DistributedFileSystem fs = dfsClient.getFilesystem();
            for (LocalResourceDTO dto : this.extraFiles) {
                String pathToResource = dto.getPath();
                pathToResource = pathToResource.replaceFirst("hdfs:/*Projects", "hdfs:///Projects");
                pathToResource = pathToResource.replaceFirst("hdfs:/*user", "hdfs:///user");
                Path src = new Path(pathToResource);
                FileStatus scFileStat = fs.getFileStatus(src);
                LocalResource resource = LocalResource.newInstance((URL)ConverterUtils.getYarnUrlFromPath((Path)src), (LocalResourceType)LocalResourceType.valueOf((String)dto.getType().toUpperCase()), (LocalResourceVisibility)LocalResourceVisibility.valueOf((String)dto.getVisibility().toUpperCase()), (long)scFileStat.getLen(), (long)scFileStat.getModificationTime(), (String)dto.getPattern());
                cluster.addHopsworksResource(dto.getName(), resource);
            }
        }
        this.addSystemProperty("hopsworks.restendpoint", this.serviceProps.getRestEndpoint());
        this.addSystemProperty("hopsworks.projectid", Integer.toString(this.serviceProps.getProjectId()));
        if (!this.sysProps.isEmpty()) {
            this.dynamicPropertiesEncoded = new StringBuilder();
            for (String s : this.sysProps.keySet()) {
                String option = YarnRunner.escapeForShell("-D" + s + "=" + this.sysProps.get(s));
                builder.addJavaOption(option);
                cluster.addHopsworksParam(option);
                this.dynamicPropertiesEncoded.append(s).append("=").append(this.sysProps.get(s)).append("@@");
            }
            if (this.dynamicPropertiesEncoded.length() > 0) {
                cluster.setDynamicPropertiesEncoded(this.dynamicPropertiesEncoded.substring(0, this.dynamicPropertiesEncoded.lastIndexOf("@@")));
            }
        }
        builder.setJobType(JobType.FLINK);
        builder.setAppJarPath(this.appJarPath);
        builder.setParallelism(this.parallelism);
        if (this.customName == null) {
            name = "Flink session with " + this.taskManagerCount + " TaskManagers";
            if (this.detached) {
                name = name + " (detached)";
            }
        } else {
            name = this.customName;
        }
        cluster.setName(name);
        StringBuilder amargs = new StringBuilder("");
        for (String s : this.jobArgs) {
            amargs.append(" ").append(s);
        }
        if (!amargs.toString().equals("")) {
            builder.amArgs(amargs.toString());
        }
        return builder.build(flinkDir, JobType.FLINK, services);
    }

    public static class YarnDeploymentException
    extends RuntimeException {
        private static final long serialVersionUID = -812040641215388943L;

        public YarnDeploymentException() {
        }

        public YarnDeploymentException(String message) {
            super(message);
        }

        public YarnDeploymentException(String message, Throwable cause) {
            super(message, cause);
        }
    }
}

