/*
 * Decompiled with CFR 0.152.
 */
package io.hops.cli.action;

import io.hops.cli.action.JobAction;
import io.hops.cli.action.JobRunAction;
import io.hops.cli.config.HopsworksAPIConfig;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.concurrent.TimeUnit;
import javax.json.JsonArray;
import javax.json.JsonValue;
import org.apache.commons.lang3.SystemUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubmitFlinkJob
extends JobAction {
    private static final Logger logger = LoggerFactory.getLogger(JobRunAction.class);
    private final String url;
    protected int userExecId = 0;
    protected String local_file_path = "";
    protected String mainClass = "";
    protected String userArgs = "";
    HopsworksAPIConfig hopsworksAPIConfig;
    private URI filePath;

    public SubmitFlinkJob(HopsworksAPIConfig hopsworksAPIConfig, String jobName) {
        super(hopsworksAPIConfig, jobName);
        this.hopsworksAPIConfig = hopsworksAPIConfig;
        this.url = hopsworksAPIConfig.getApiUrl();
    }

    public void setUserExecId(int userExecId) {
        this.userExecId = userExecId;
    }

    public String getLocal_file_path() {
        return this.local_file_path;
    }

    public void setLocal_file_path(String local_file_path) {
        this.local_file_path = local_file_path;
    }

    public String getMainClass() {
        return this.mainClass;
    }

    public void setMainClass(String mainClass) {
        this.mainClass = mainClass;
    }

    private String getUserArgs() {
        return this.userArgs;
    }

    public void setUserArgs(String userArgs) throws UnsupportedEncodingException {
        this.userArgs = URLEncoder.encode(userArgs, "utf8");
    }

    private void initPath(String filePath) throws URISyntaxException {
        URI path = SystemUtils.IS_OS_WINDOWS && !filePath.startsWith("file://") ? new URI("file:///" + filePath) : (filePath.startsWith("/") ? new URI("file://" + filePath) : new URI(filePath));
        this.filePath = path;
    }

    @Override
    public int execute() throws Exception {
        String base_url;
        CloseableHttpResponse responseUpload;
        int statusCode;
        String app_id = "";
        String previousExecStatus = "";
        this.initPath(this.getLocal_file_path());
        if (this.userExecId == 0) {
            this.getLatestExecution();
        } else {
            this.getExecutionById(this.userExecId);
        }
        JsonArray resultArray = this.getJsonResult().getJsonArray("items");
        if (resultArray != null) {
            previousExecStatus = ((JsonValue)resultArray.get(0)).asJsonObject().getString("state");
        }
        if (!(previousExecStatus == null || previousExecStatus.equals("RUNNING") || previousExecStatus.equals("INITIALIZING") || previousExecStatus.equals("ACCEPTED") || previousExecStatus.equals("NEW") || previousExecStatus.equals("NEW_SAVING") || previousExecStatus.equals("SUBMITTED"))) {
            JobRunAction runJob = new JobRunAction(this.hopsworksAPIConfig, this.jobName, this.userArgs);
            int status_cd_run = runJob.execute();
            if (status_cd_run != 201 && status_cd_run != 200) {
                throw new Exception("Could not submit HTTP request to start job run");
            }
            int wait = 90;
            int sleep_time = 5;
            String runStatus = "";
            for (int wait_count = 0; wait_count < wait && !runStatus.equals("RUNNING"); wait_count += sleep_time) {
                TimeUnit.SECONDS.sleep(sleep_time);
                this.getLatestExecution();
                resultArray = this.getJsonResult().getJsonArray("items");
                runStatus = ((JsonValue)resultArray.get(0)).asJsonObject().getString("state");
            }
            if (!runStatus.equals("RUNNING")) {
                throw new Exception("Flink cluster did not start, check job logs for details");
            }
            logger.info("Started Flink cluster with job name ", (Object)this.jobName);
        } else {
            logger.info("Found Flink cluster with this name already running, will use it to submit the Flink job");
        }
        resultArray = this.getJsonResult().getJsonArray("items");
        if (resultArray != null) {
            app_id = ((JsonValue)((JsonValue)resultArray.get(0)).asJsonObject().get((Object)"appId")).toString();
            app_id = JSON.parse((String)app_id).toString();
        }
        if ((statusCode = (responseUpload = this.executeUpload(base_url = this.url + "hopsworks-api/flinkmaster/" + app_id, this.filePath.getPath())).getStatusLine().getStatusCode()) != 200) {
            throw new Exception("HTTP File Upload not successful");
        }
        this.readJsonResponse(responseUpload);
        String respFile = this.getJsonResult().getString("filename");
        String[] items = respFile.split("/");
        String jarId = items[items.length - 1];
        base_url = base_url + "/jars/" + jarId + "/run?entry-class=" + this.getMainClass() + "&program-args=" + this.getUserArgs();
        logger.info("Submitting flink job to: " + base_url);
        statusCode = this.submitPostRequest(base_url);
        return statusCode;
    }

    public int submitPostRequest(String url) throws IOException {
        CloseableHttpClient httpClient = this.getClient();
        HttpPost request = new HttpPost(url);
        this.addHeaders((HttpRequestBase)request);
        CloseableHttpResponse response = httpClient.execute((HttpUriRequest)request);
        int status = this.readJsonResponse(response);
        if (status != 201 && status != 200) {
            throw new IOException(response.getStatusLine().getReasonPhrase());
        }
        httpClient.close();
        response.close();
        return status;
    }

    public CloseableHttpResponse executeUpload(String baseUrl, String filePath) throws Exception {
        CloseableHttpClient httpClient = this.getClient();
        String apiUrl = baseUrl + "/jars/upload";
        HttpPost uploadFile = new HttpPost(apiUrl);
        MultipartEntityBuilder builder = MultipartEntityBuilder.create();
        File targetFile = new File(filePath);
        builder.addBinaryBody("jarfile", (InputStream)new FileInputStream(targetFile), ContentType.APPLICATION_OCTET_STREAM, targetFile.getName());
        HttpEntity multipart = builder.build();
        this.addHeaders((HttpRequestBase)uploadFile);
        uploadFile.setEntity(multipart);
        return httpClient.execute((HttpUriRequest)uploadFile);
    }
}

