/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program.rest;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RestClusterClientTest
extends TestLogger {
    private final DispatcherGateway mockRestfulGateway = new TestingDispatcherGateway.Builder().build();
    private GatewayRetriever<DispatcherGateway> mockGatewayRetriever;
    private volatile FailHttpRequestPredicate failHttpRequest = FailHttpRequestPredicate.never();
    private ExecutorService executor;
    private JobGraph jobGraph;
    private JobID jobId;
    private static final Configuration restConfig;

    @Before
    public void setUp() throws Exception {
        this.mockGatewayRetriever = () -> CompletableFuture.completedFuture(this.mockRestfulGateway);
        this.executor = Executors.newSingleThreadExecutor((ThreadFactory)new ExecutorThreadFactory(RestClusterClientTest.class.getSimpleName()));
        this.jobGraph = JobGraphTestUtils.emptyJobGraph();
        this.jobId = this.jobGraph.getJobID();
    }

    @After
    public void tearDown() {
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    private RestClusterClient<StandaloneClusterId> createRestClusterClient(int port) throws Exception {
        return this.createRestClusterClient(port, new Configuration(restConfig));
    }

    private RestClusterClient<StandaloneClusterId> createRestClusterClient(int port, Configuration clientConfig) throws Exception {
        clientConfig.setInteger(RestOptions.PORT, port);
        return new RestClusterClient(clientConfig, this.createRestClient(), (Object)StandaloneClusterId.getInstance(), attempt -> 0L);
    }

    @Nonnull
    private RestClient createRestClient() throws ConfigurationException {
        return new RestClient(restConfig, this.executor){

            public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request, Collection<FileUpload> files) throws IOException {
                if (RestClusterClientTest.this.failHttpRequest.test(messageHeaders, messageParameters, request)) {
                    return FutureUtils.completedExceptionally((Throwable)new IOException("expected"));
                }
                return super.sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, files);
            }
        };
    }

    @Test
    public void testJobSubmitCancel() throws Exception {
        TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
        TestJobCancellationHandler terminationHandler = new TestJobCancellationHandler();
        try (TestRestServerEndpoint restServerEndpoint = this.createRestServerEndpoint(submitHandler, terminationHandler);
             RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());){
            Assert.assertFalse((boolean)submitHandler.jobSubmitted);
            restClusterClient.submitJob(this.jobGraph).get();
            Assert.assertTrue((boolean)submitHandler.jobSubmitted);
            Assert.assertFalse((boolean)terminationHandler.jobCanceled);
            restClusterClient.cancel(this.jobId).get();
            Assert.assertTrue((boolean)terminationHandler.jobCanceled);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDisposeSavepoint() throws Exception {
        TestSavepointDisposalHandlers testSavepointDisposalHandlers;
        String savepointPath = "foobar";
        String exceptionMessage = "Test exception.";
        FlinkException testException = new FlinkException("Test exception.");
        TestSavepointDisposalHandlers testSavepointDisposalHandlers2 = testSavepointDisposalHandlers = new TestSavepointDisposalHandlers("foobar");
        testSavepointDisposalHandlers2.getClass();
        TestSavepointDisposalHandlers.TestSavepointDisposalTriggerHandler testSavepointDisposalTriggerHandler = testSavepointDisposalHandlers2.new TestSavepointDisposalHandlers.TestSavepointDisposalTriggerHandler();
        TestSavepointDisposalHandlers testSavepointDisposalHandlers3 = testSavepointDisposalHandlers;
        testSavepointDisposalHandlers3.getClass();
        TestSavepointDisposalHandlers.TestSavepointDisposalStatusHandler testSavepointDisposalStatusHandler = testSavepointDisposalHandlers3.new TestSavepointDisposalHandlers.TestSavepointDisposalStatusHandler(new OptionalFailure[]{OptionalFailure.of((Object)AsynchronousOperationInfo.complete()), OptionalFailure.of((Object)AsynchronousOperationInfo.completeExceptional((SerializedThrowable)new SerializedThrowable((Throwable)testException))), OptionalFailure.ofFailure((Throwable)testException)});
        try (TestRestServerEndpoint restServerEndpoint = this.createRestServerEndpoint(testSavepointDisposalStatusHandler, testSavepointDisposalTriggerHandler);
             RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());){
            CompletableFuture disposeSavepointFuture = restClusterClient.disposeSavepoint("foobar");
            MatcherAssert.assertThat(disposeSavepointFuture.get(), (Matcher)Matchers.is((Object)Acknowledge.get()));
            disposeSavepointFuture = restClusterClient.disposeSavepoint("foobar");
            try {
                disposeSavepointFuture.get();
                Assert.fail((String)"Expected an exception");
            }
            catch (ExecutionException ee) {
                MatcherAssert.assertThat((Object)ExceptionUtils.findThrowableWithMessage((Throwable)ee, (String)"Test exception.").isPresent(), (Matcher)Matchers.is((Object)true));
            }
            try {
                restClusterClient.disposeSavepoint("foobar").get();
                Assert.fail((String)"Expected an exception.");
            }
            catch (ExecutionException ee) {
                MatcherAssert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)ee, RestClientException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListJobs() throws Exception {
        try (TestRestServerEndpoint restServerEndpoint = this.createRestServerEndpoint(new TestListJobsHandler());
             RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());){
            CompletableFuture jobDetailsFuture = restClusterClient.listJobs();
            Collection jobDetails = (Collection)jobDetailsFuture.get();
            Iterator jobDetailsIterator = jobDetails.iterator();
            JobStatusMessage job1 = (JobStatusMessage)jobDetailsIterator.next();
            JobStatusMessage job2 = (JobStatusMessage)jobDetailsIterator.next();
            Assert.assertNotEquals((String)"The job status should not be equal.", (Object)job1.getJobState(), (Object)job2.getJobState());
        }
    }

    @Test
    public void testGetAccumulators() throws Exception {
        TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler();
        try (TestRestServerEndpoint restServerEndpoint = this.createRestServerEndpoint(accumulatorHandler);
             RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());){
            JobID id = new JobID();
            Map accumulators = (Map)restClusterClient.getAccumulators(id).get();
            Assert.assertNotNull((Object)accumulators);
            Assert.assertEquals((long)1L, (long)accumulators.size());
            Assert.assertTrue((boolean)accumulators.containsKey("testKey"));
            Assert.assertEquals((Object)"testValue", (Object)accumulators.get("testKey").toString());
        }
    }

    @Test
    public void testRESTManualConfigurationOverride() throws Exception {
        String configuredHostname = "localhost";
        int configuredPort = 1234;
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.ADDRESS, "localhost");
        configuration.setInteger(JobManagerOptions.PORT, 1234);
        configuration.setString(RestOptions.ADDRESS, "localhost");
        configuration.setInteger(RestOptions.PORT, 1234);
        DefaultCLI defaultCLI = new DefaultCLI();
        String manualHostname = "123.123.123.123";
        int manualPort = 4321;
        String[] args = new String[]{"-m", "123.123.123.123:4321"};
        CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
        DefaultClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
        Configuration executorConfig = defaultCLI.toConfiguration(commandLine);
        ClusterClientFactory clusterFactory = serviceLoader.getClusterClientFactory(executorConfig);
        Preconditions.checkState((clusterFactory != null ? 1 : 0) != 0);
        ClusterDescriptor clusterDescriptor = clusterFactory.createClusterDescriptor(executorConfig);
        RestClusterClient clusterClient = (RestClusterClient)clusterDescriptor.retrieve(clusterFactory.getClusterId(executorConfig)).getClusterClient();
        URL webMonitorBaseUrl = (URL)clusterClient.getWebMonitorBaseUrl().get();
        MatcherAssert.assertThat((Object)webMonitorBaseUrl.getHost(), (Matcher)Matchers.equalTo((Object)"123.123.123.123"));
        MatcherAssert.assertThat((Object)webMonitorBaseUrl.getPort(), (Matcher)Matchers.equalTo((Object)4321));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetriableSendOperationIfConnectionErrorOrServiceUnavailable() throws Exception {
        PingRestHandler pingRestHandler = new PingRestHandler(new CompletableFuture[]{FutureUtils.completedExceptionally((Throwable)new RestHandlerException("test exception", HttpResponseStatus.SERVICE_UNAVAILABLE)), CompletableFuture.completedFuture(EmptyResponseBody.getInstance())});
        try (TestRestServerEndpoint restServerEndpoint = this.createRestServerEndpoint(pingRestHandler);
             RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());){
            AtomicBoolean firstPollFailed = new AtomicBoolean();
            this.failHttpRequest = (messageHeaders, messageParameters, requestBody) -> messageHeaders instanceof PingRestHandlerHeaders && !firstPollFailed.getAndSet(true);
            restClusterClient.sendRequest((MessageHeaders)PingRestHandlerHeaders.INSTANCE).get();
        }
    }

    @Test
    public void testJobSubmissionRespectsConfiguredRetryPolicy() throws Exception {
        int maxRetryAttempts = 3;
        AtomicInteger failedRequest = new AtomicInteger(0);
        this.failHttpRequest = (messageHeaders, messageParameters, requestBody) -> {
            failedRequest.incrementAndGet();
            return true;
        };
        Configuration clientConfig = new Configuration(restConfig);
        clientConfig.set(RestOptions.RETRY_MAX_ATTEMPTS, (Object)3);
        clientConfig.set(RestOptions.RETRY_DELAY, (Object)10L);
        try (TestRestServerEndpoint restServerEndpoint = this.createRestServerEndpoint(new TestJobSubmitHandler());){
            InetSocketAddress serverAddress = Objects.requireNonNull(restServerEndpoint.getServerAddress());
            try (RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(serverAddress.getPort(), clientConfig);){
                ExecutionException exception = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
                    JobID cfr_ignored_0 = (JobID)restClusterClient.submitJob(this.jobGraph).get();
                });
                MatcherAssert.assertThat((Object)exception, (Matcher)FlinkMatchers.containsCause(FutureUtils.RetryException.class));
                Assert.assertEquals((long)4L, (long)failedRequest.get());
            }
        }
    }

    @Test
    public void testSubmitJobAndWaitForExecutionResult() throws Exception {
        TestJobExecutionResultHandler testJobExecutionResultHandler = new TestJobExecutionResultHandler(new Object[]{new RestHandlerException("should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE), JobExecutionResultResponseBody.inProgress(), JobExecutionResultResponseBody.created((JobResult)new JobResult.Builder().applicationStatus(ApplicationStatus.UNKNOWN).jobId(this.jobId).netRuntime(Long.MAX_VALUE).accumulatorResults(Collections.singletonMap("testName", new SerializedValue((Object)OptionalFailure.of((Object)1.0)))).build()), JobExecutionResultResponseBody.created((JobResult)new JobResult.Builder().applicationStatus(ApplicationStatus.SUCCEEDED).jobId(this.jobId).netRuntime(Long.MAX_VALUE).accumulatorResults(Collections.singletonMap("testName", new SerializedValue((Object)OptionalFailure.of((Object)1.0)))).build()), JobExecutionResultResponseBody.created((JobResult)new JobResult.Builder().applicationStatus(ApplicationStatus.FAILED).jobId(this.jobId).netRuntime(Long.MAX_VALUE).serializedThrowable(new SerializedThrowable((Throwable)new RuntimeException("expected"))).build())});
        AtomicBoolean firstExecutionResultPollFailed = new AtomicBoolean(false);
        AtomicBoolean firstSubmitRequestFailed = new AtomicBoolean(false);
        this.failHttpRequest = (messageHeaders, messageParameters, requestBody) -> {
            if (messageHeaders instanceof JobExecutionResultHeaders) {
                return !firstExecutionResultPollFailed.getAndSet(true);
            }
            if (messageHeaders instanceof JobSubmitHeaders) {
                return !firstSubmitRequestFailed.getAndSet(true);
            }
            return false;
        };
        try (TestRestServerEndpoint restServerEndpoint = this.createRestServerEndpoint(testJobExecutionResultHandler, new TestJobSubmitHandler());
             RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());){
            JobExecutionResult jobExecutionResult = ((JobResult)((CompletableFuture)restClusterClient.submitJob(this.jobGraph).thenCompose(arg_0 -> restClusterClient.requestJobResult(arg_0))).get()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
            Assert.assertTrue((boolean)firstExecutionResultPollFailed.get());
            Assert.assertTrue((boolean)firstSubmitRequestFailed.get());
            MatcherAssert.assertThat((Object)jobExecutionResult.getJobID(), (Matcher)Matchers.equalTo((Object)this.jobId));
            MatcherAssert.assertThat((Object)jobExecutionResult.getNetRuntime(), (Matcher)Matchers.equalTo((Object)Long.MAX_VALUE));
            MatcherAssert.assertThat((Object)jobExecutionResult.getAllAccumulatorResults(), (Matcher)Matchers.equalTo(Collections.singletonMap("testName", 1.0)));
            try {
                ((JobResult)((CompletableFuture)restClusterClient.submitJob(this.jobGraph).thenCompose(arg_0 -> restClusterClient.requestJobResult(arg_0))).get()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
                Assert.fail((String)"Expected exception not thrown.");
            }
            catch (Exception e) {
                Optional cause = ExceptionUtils.findThrowable((Throwable)e, RuntimeException.class);
                MatcherAssert.assertThat((Object)cause.isPresent(), (Matcher)Matchers.is((Object)true));
                MatcherAssert.assertThat((Object)((RuntimeException)cause.get()).getMessage(), (Matcher)Matchers.equalTo((Object)"expected"));
            }
        }
    }

    @Test
    public void testJobSubmissionFailureCauseForwardedToClient() throws Exception {
        try (TestRestServerEndpoint restServerEndpoint = this.createRestServerEndpoint(new SubmissionFailingHandler());){
            try (RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());){
                ((JobResult)((CompletableFuture)restClusterClient.submitJob(this.jobGraph).thenCompose(arg_0 -> restClusterClient.requestJobResult(arg_0))).get()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"RestHandlerException: expected").isPresent());
                if (restServerEndpoint != null) {
                    if (var2_2 != null) {
                        try {
                            restServerEndpoint.close();
                        }
                        catch (Throwable throwable) {
                            var2_2.addSuppressed(throwable);
                        }
                    } else {
                        restServerEndpoint.close();
                    }
                }
                return;
            }
            Assert.fail((String)"Should failed with exception");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSendIsNotRetriableIfHttpNotFound() throws Exception {
        String exceptionMessage = "test exception";
        PingRestHandler pingRestHandler = new PingRestHandler(new CompletableFuture[]{FutureUtils.completedExceptionally((Throwable)new RestHandlerException("test exception", HttpResponseStatus.NOT_FOUND))});
        try (TestRestServerEndpoint restServerEndpoint = this.createRestServerEndpoint(pingRestHandler);
             RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());){
            restClusterClient.sendRequest((MessageHeaders)PingRestHandlerHeaders.INSTANCE).get();
            Assert.fail((String)"The rest request should have failed.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSendCoordinationRequest() throws Exception {
        TestClientCoordinationHandler handler = new TestClientCoordinationHandler();
        try (TestRestServerEndpoint restServerEndpoint = this.createRestServerEndpoint(handler);){
            RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
            String payload = "testing payload";
            TestCoordinationRequest request = new TestCoordinationRequest(payload);
            try {
                CompletableFuture future = restClusterClient.sendCoordinationRequest(this.jobId, new OperatorID(), request);
                TestCoordinationResponse response = (TestCoordinationResponse)future.get();
                Assert.assertEquals((Object)payload, (Object)response.payload);
            }
            finally {
                restClusterClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNotShowSuspendedJobStatus() throws Exception {
        ArrayList<JobDetailsInfo> jobDetails = new ArrayList<JobDetailsInfo>();
        jobDetails.add(this.buildJobDetail(JobStatus.SUSPENDED));
        jobDetails.add(this.buildJobDetail(JobStatus.RUNNING));
        TestJobStatusHandler jobStatusHandler = new TestJobStatusHandler(jobDetails.iterator());
        try (TestRestServerEndpoint restServerEndpoint = this.createRestServerEndpoint(jobStatusHandler);
             RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());){
            CompletableFuture future = restClusterClient.getJobStatus(this.jobId);
            Assert.assertEquals((Object)JobStatus.RUNNING, future.get());
        }
    }

    private JobDetailsInfo buildJobDetail(JobStatus jobStatus) {
        return new JobDetailsInfo(this.jobId, "testJob", true, jobStatus, 1L, 2L, 1L, 8888L, 1984L, new HashMap(), new ArrayList(), new HashMap(), "{\"id\":\"1234\"}");
    }

    private TestRestServerEndpoint createRestServerEndpoint(AbstractRestHandler<?, ?, ?, ?> ... abstractRestHandlers) throws Exception {
        TestRestServerEndpoint.Builder builder = TestRestServerEndpoint.builder((Configuration)restConfig);
        Arrays.stream(abstractRestHandlers).forEach(arg_0 -> ((TestRestServerEndpoint.Builder)builder).withHandler(arg_0));
        return builder.buildAndStart();
    }

    static {
        Configuration config = new Configuration();
        config.setString(JobManagerOptions.ADDRESS, "localhost");
        config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
        config.setLong(RestOptions.RETRY_DELAY, 0L);
        config.setInteger(RestOptions.PORT, 0);
        restConfig = config;
    }

    @FunctionalInterface
    private static interface FailHttpRequestPredicate {
        public boolean test(MessageHeaders<?, ?, ?> var1, MessageParameters var2, RequestBody var3);

        public static FailHttpRequestPredicate never() {
            return (messageHeaders, messageParameters, requestBody) -> false;
        }
    }

    private abstract class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters>
    extends AbstractRestHandler<DispatcherGateway, R, P, M> {
        private TestHandler(MessageHeaders<R, P, M> headers) {
            super(RestClusterClientTest.this.mockGatewayRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), headers);
        }
    }

    private class TestJobStatusHandler
    extends TestHandler<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> {
        private final Iterator<JobDetailsInfo> jobDetailsInfo;

        private TestJobStatusHandler(Iterator<JobDetailsInfo> jobDetailsInfo) {
            super((MessageHeaders)JobDetailsHeaders.getInstance());
            Preconditions.checkState((boolean)jobDetailsInfo.hasNext(), (Object)"Job details are empty");
            this.jobDetailsInfo = (Iterator)Preconditions.checkNotNull(jobDetailsInfo);
        }

        protected CompletableFuture<JobDetailsInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            if (!this.jobDetailsInfo.hasNext()) {
                throw new IllegalStateException("More job details were requested than configured");
            }
            return CompletableFuture.completedFuture(this.jobDetailsInfo.next());
        }
    }

    private class TestListJobsHandler
    extends TestHandler<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
        private TestListJobsHandler() {
            super((MessageHeaders)JobsOverviewHeaders.getInstance());
        }

        protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            JobDetails running = new JobDetails(new JobID(), "job1", 0L, 0L, 0L, JobStatus.RUNNING, 0L, new int[10], 0);
            JobDetails finished = new JobDetails(new JobID(), "job2", 0L, 0L, 0L, JobStatus.FINISHED, 0L, new int[10], 0);
            return CompletableFuture.completedFuture(new MultipleJobsDetails(Arrays.asList(running, finished)));
        }
    }

    private class TestAccumulatorHandler
    extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
        public TestAccumulatorHandler() {
            super((MessageHeaders)JobAccumulatorsHeaders.getInstance());
        }

        protected CompletableFuture<JobAccumulatorsInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobAccumulatorsMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            JobAccumulatorsInfo accumulatorsInfo;
            List queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
            boolean includeSerializedValue = !queryParams.isEmpty() ? (Boolean)queryParams.get(0) : false;
            ArrayList<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<JobAccumulatorsInfo.UserTaskAccumulator>(1);
            userTaskAccumulators.add(new JobAccumulatorsInfo.UserTaskAccumulator("testName", "testType", "testValue"));
            if (includeSerializedValue) {
                HashMap<String, SerializedValue> serializedUserTaskAccumulators = new HashMap<String, SerializedValue>(1);
                try {
                    serializedUserTaskAccumulators.put("testKey", new SerializedValue((Object)OptionalFailure.of((Object)"testValue")));
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
                accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, serializedUserTaskAccumulators);
            } else {
                accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, Collections.emptyMap());
            }
            return CompletableFuture.completedFuture(accumulatorsInfo);
        }
    }

    private static class TestCoordinationResponse<T>
    implements CoordinationResponse {
        private static final long serialVersionUID = 1L;
        private final T payload;

        private TestCoordinationResponse(T payload) {
            this.payload = payload;
        }
    }

    private static class TestCoordinationRequest<T>
    implements CoordinationRequest {
        private static final long serialVersionUID = 1L;
        private final T payload;

        private TestCoordinationRequest(T payload) {
            this.payload = payload;
        }
    }

    private class TestClientCoordinationHandler
    extends TestHandler<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> {
        private TestClientCoordinationHandler() {
            super((MessageHeaders)ClientCoordinationHeaders.getInstance());
        }

        protected CompletableFuture<ClientCoordinationResponseBody> handleRequest(@Nonnull HandlerRequest<ClientCoordinationRequestBody, ClientCoordinationMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            try {
                TestCoordinationRequest req = (TestCoordinationRequest)((ClientCoordinationRequestBody)request.getRequestBody()).getSerializedCoordinationRequest().deserializeValue(((Object)((Object)this)).getClass().getClassLoader());
                TestCoordinationResponse resp = new TestCoordinationResponse(req.payload);
                return CompletableFuture.completedFuture(new ClientCoordinationResponseBody(new SerializedValue(resp)));
            }
            catch (Exception e) {
                return FutureUtils.completedExceptionally((Throwable)e);
            }
        }
    }

    private static final class PingRestHandlerHeaders
    implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        static final PingRestHandlerHeaders INSTANCE = new PingRestHandlerHeaders();

        private PingRestHandlerHeaders() {
        }

        public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return "foobar";
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return "/foobar";
        }
    }

    private class PingRestHandler
    extends TestHandler<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        private final Queue<CompletableFuture<EmptyResponseBody>> responseQueue;

        private PingRestHandler(CompletableFuture<EmptyResponseBody> ... responses) {
            super(PingRestHandlerHeaders.INSTANCE);
            this.responseQueue = new ArrayDeque<CompletableFuture<EmptyResponseBody>>(Arrays.asList(responses));
        }

        protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            CompletableFuture<EmptyResponseBody> result = this.responseQueue.poll();
            if (result != null) {
                return result;
            }
            return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
        }
    }

    private final class SubmissionFailingHandler
    extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
        private SubmissionFailingHandler() {
            super((MessageHeaders)JobSubmitHeaders.getInstance());
        }

        protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            throw new RestHandlerException("expected", HttpResponseStatus.INTERNAL_SERVER_ERROR);
        }
    }

    private class TestJobExecutionResultHandler
    extends TestHandler<EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> {
        private final Iterator<Object> jobExecutionResults;
        private Object lastJobExecutionResult;

        private TestJobExecutionResultHandler(Object ... jobExecutionResults) {
            super((MessageHeaders)JobExecutionResultHeaders.getInstance());
            Preconditions.checkArgument((boolean)Arrays.stream(jobExecutionResults).allMatch(object -> object instanceof JobExecutionResultResponseBody || object instanceof RestHandlerException));
            this.jobExecutionResults = Arrays.asList(jobExecutionResults).iterator();
        }

        protected CompletableFuture<JobExecutionResultResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request, @Nonnull DispatcherGateway gateway) {
            if (this.jobExecutionResults.hasNext()) {
                this.lastJobExecutionResult = this.jobExecutionResults.next();
            }
            Preconditions.checkState((this.lastJobExecutionResult != null ? 1 : 0) != 0);
            if (this.lastJobExecutionResult instanceof JobExecutionResultResponseBody) {
                return CompletableFuture.completedFuture((JobExecutionResultResponseBody)this.lastJobExecutionResult);
            }
            if (this.lastJobExecutionResult instanceof RestHandlerException) {
                return FutureUtils.completedExceptionally((Throwable)((RestHandlerException)this.lastJobExecutionResult));
            }
            throw new AssertionError();
        }
    }

    private class TestSavepointDisposalHandlers {
        private final TriggerId triggerId = new TriggerId();
        private final String savepointPath;

        private TestSavepointDisposalHandlers(String savepointPath) {
            this.savepointPath = (String)Preconditions.checkNotNull((Object)savepointPath);
        }

        private class TestSavepointDisposalStatusHandler
        extends TestHandler<EmptyRequestBody, AsynchronousOperationResult<AsynchronousOperationInfo>, SavepointDisposalStatusMessageParameters> {
            private final Queue<OptionalFailure<AsynchronousOperationInfo>> responses;

            private TestSavepointDisposalStatusHandler(OptionalFailure<AsynchronousOperationInfo> ... responses) {
                super((MessageHeaders)SavepointDisposalStatusHeaders.getInstance());
                this.responses = new ArrayDeque<OptionalFailure<AsynchronousOperationInfo>>(Arrays.asList(responses));
            }

            protected CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointDisposalStatusMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
                TriggerId actualTriggerId = (TriggerId)request.getPathParameter(TriggerIdPathParameter.class);
                if (actualTriggerId.equals((Object)TestSavepointDisposalHandlers.this.triggerId)) {
                    OptionalFailure<AsynchronousOperationInfo> nextResponse = this.responses.poll();
                    if (nextResponse != null) {
                        if (nextResponse.isFailure()) {
                            throw new RestHandlerException("Failure", HttpResponseStatus.BAD_REQUEST, nextResponse.getFailureCause());
                        }
                        return CompletableFuture.completedFuture(AsynchronousOperationResult.completed((Object)nextResponse.getUnchecked()));
                    }
                    throw new AssertionError();
                }
                throw new AssertionError();
            }
        }

        private class TestSavepointDisposalTriggerHandler
        extends TestHandler<SavepointDisposalRequest, TriggerResponse, EmptyMessageParameters> {
            private TestSavepointDisposalTriggerHandler() {
                super((MessageHeaders)SavepointDisposalTriggerHeaders.getInstance());
            }

            protected CompletableFuture<TriggerResponse> handleRequest(@Nonnull HandlerRequest<SavepointDisposalRequest, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) {
                MatcherAssert.assertThat((Object)((SavepointDisposalRequest)request.getRequestBody()).getSavepointPath(), (Matcher)Matchers.is((Object)TestSavepointDisposalHandlers.this.savepointPath));
                return CompletableFuture.completedFuture(new TriggerResponse(TestSavepointDisposalHandlers.this.triggerId));
            }
        }
    }

    private class TestJobCancellationHandler
    extends TestHandler<EmptyRequestBody, EmptyResponseBody, JobCancellationMessageParameters> {
        private volatile boolean jobCanceled;

        private TestJobCancellationHandler() {
            super((MessageHeaders)JobCancellationHeaders.getInstance());
            this.jobCanceled = false;
        }

        protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobCancellationMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            this.jobCanceled = true;
            return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
        }
    }

    private class TestJobSubmitHandler
    extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
        private volatile boolean jobSubmitted;

        private TestJobSubmitHandler() {
            super((MessageHeaders)JobSubmitHeaders.getInstance());
            this.jobSubmitted = false;
        }

        protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            this.jobSubmitted = true;
            return CompletableFuture.completedFuture(new JobSubmitResponseBody("/url"));
        }
    }
}

