package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.PoisonPill;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/client/JobClient.class */
public class JobClient {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) JobClient.class);

    public static ActorSystem startJobClientActorSystem(Configuration configuration) throws IOException {
        LOG.info("Starting JobClient actor system");
        ActorSystem createActorSystem = AkkaUtils.createActorSystem(configuration, new Some(new Tuple2("", 0)));
        Address defaultAddress = createActorSystem.provider().getDefaultAddress();
        LOG.info("Started JobClient actor system at " + (defaultAddress.host().isDefined() ? NetUtils.ipAddressToUrlString(InetAddress.getByName(defaultAddress.host().get())) : "(unknown)") + ':' + (defaultAddress.port().isDefined() ? ((Integer) defaultAddress.port().get()).intValue() : -1));
        return createActorSystem;
    }

    public static JobExecutionResult submitJobAndWait(ActorSystem actorSystem, LeaderRetrievalService leaderRetrievalService, JobGraph jobGraph, FiniteDuration finiteDuration, boolean z, ClassLoader classLoader) throws JobExecutionException {
        Preconditions.checkNotNull(actorSystem, "The actorSystem must not be null.");
        Preconditions.checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
        Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
        Preconditions.checkNotNull(finiteDuration, "The timeout must not be null.");
        ActorRef actorOf = actorSystem.actorOf(JobClientActor.createJobClientActorProps(leaderRetrievalService, finiteDuration, z));
        try {
            try {
                Object result = Await.result(Patterns.ask(actorOf, new JobClientMessages.SubmitJobAndWait(jobGraph), new Timeout(AkkaUtils.INF_TIMEOUT())), AkkaUtils.INF_TIMEOUT());
                actorOf.tell(PoisonPill.getInstance(), ActorRef.noSender());
                if (result instanceof JobManagerMessages.JobResultSuccess) {
                    LOG.info("Job execution complete");
                    SerializedJobExecutionResult result2 = ((JobManagerMessages.JobResultSuccess) result).result();
                    if (result2 == null) {
                        throw new JobExecutionException(jobGraph.getJobID(), "Job was successfully executed but result contained a null JobExecutionResult.");
                    }
                    try {
                        return result2.toJobExecutionResult(classLoader);
                    } catch (Throwable th) {
                        throw new JobExecutionException(jobGraph.getJobID(), "Job was successfully executed but JobExecutionResult could not be deserialized.");
                    }
                }
                if (!(result instanceof JobManagerMessages.JobResultFailure)) {
                    throw new JobExecutionException(jobGraph.getJobID(), "Unknown answer from JobManager after submitting the job: " + result);
                }
                LOG.info("Job execution failed");
                SerializedThrowable cause = ((JobManagerMessages.JobResultFailure) result).cause();
                if (cause == null) {
                    throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed with null as failure cause.");
                }
                Throwable deserializeError = cause.deserializeError(classLoader);
                if (deserializeError instanceof JobExecutionException) {
                    throw ((JobExecutionException) deserializeError);
                }
                throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed", deserializeError);
            } catch (Throwable th2) {
                actorOf.tell(PoisonPill.getInstance(), ActorRef.noSender());
                throw th2;
            }
        } catch (TimeoutException e) {
            throw new JobTimeoutException(jobGraph.getJobID(), "Timeout while waiting for JobManager answer. Job time exceeded " + AkkaUtils.INF_TIMEOUT(), e);
        } catch (Throwable th3) {
            throw new JobExecutionException(jobGraph.getJobID(), "Communication with JobManager failed: " + th3.getMessage(), th3);
        }
    }

    public static void submitJobDetached(ActorGateway actorGateway, JobGraph jobGraph, FiniteDuration finiteDuration, ClassLoader classLoader) throws JobExecutionException {
        Preconditions.checkNotNull(actorGateway, "The jobManagerGateway must not be null.");
        Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
        Preconditions.checkNotNull(finiteDuration, "The timeout must not be null.");
        LOG.info("Checking and uploading JAR files");
        try {
            jobGraph.uploadUserJars(actorGateway, finiteDuration);
            try {
                Object result = Await.result(actorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), finiteDuration), finiteDuration);
                if (result instanceof JobManagerMessages.JobSubmitSuccess) {
                    JobID jobId = ((JobManagerMessages.JobSubmitSuccess) result).jobId();
                    if (!jobId.equals(jobGraph.getJobID())) {
                        throw new JobExecutionException(jobGraph.getJobID(), "JobManager responded for wrong Job. This Job: " + jobGraph.getJobID() + ", response: " + jobId);
                    }
                } else {
                    if (!(result instanceof JobManagerMessages.JobResultFailure)) {
                        throw new JobExecutionException(jobGraph.getJobID(), "Unexpected response from JobManager: " + result);
                    }
                    try {
                        throw ((JobManagerMessages.JobResultFailure) result).cause().deserializeError(classLoader);
                    } catch (JobExecutionException e) {
                        throw e;
                    } catch (Throwable th) {
                        throw new JobExecutionException(jobGraph.getJobID(), "JobSubmission failed: " + th.getMessage(), th);
                    }
                }
            } catch (TimeoutException e2) {
                throw new JobTimeoutException(jobGraph.getJobID(), "JobManager did not respond within " + finiteDuration.toString(), e2);
            } catch (Throwable th2) {
                throw new JobExecutionException(jobGraph.getJobID(), "Failed to send job to JobManager: " + th2.getMessage(), th2.getCause());
            }
        } catch (IOException e3) {
            throw new JobSubmissionException(jobGraph.getJobID(), "Could not upload the program's JAR files to the JobManager.", e3);
        }
    }
}
