/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program.rest;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.FunctionWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class RestClusterClientSavepointTriggerTest
extends TestLogger {
    private static final DispatcherGateway mockRestfulGateway = new TestingDispatcherGateway.Builder().build();
    private static final GatewayRetriever<DispatcherGateway> mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
    private static ExecutorService executor;
    private static final Configuration REST_CONFIG;

    @BeforeClass
    public static void setUp() throws ConfigurationException {
        executor = Executors.newSingleThreadExecutor((ThreadFactory)new ExecutorThreadFactory(RestClusterClientSavepointTriggerTest.class.getSimpleName()));
    }

    @AfterClass
    public static void tearDown() {
        if (executor != null) {
            executor.shutdown();
        }
    }

    @Test
    public void testTriggerSavepointDefaultDirectory() throws Exception {
        TriggerId triggerId = new TriggerId();
        String expectedReturnedSavepointDir = "hello";
        try (RestServerEndpoint restServerEndpoint = RestClusterClientSavepointTriggerTest.createRestServerEndpoint((FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException>)((FunctionWithException)request -> {
            Assert.assertNull((Object)request.getTargetDirectory());
            Assert.assertFalse((boolean)request.isCancelJob());
            return triggerId;
        }), (FunctionWithException<TriggerId, SavepointInfo, RestHandlerException>)((FunctionWithException)trigger -> {
            Assert.assertEquals((Object)triggerId, (Object)trigger);
            return new SavepointInfo("hello", null);
        }));){
            RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
            String savepointPath = (String)restClusterClient.triggerSavepoint(new JobID(), null).get();
            Assert.assertEquals((Object)"hello", (Object)savepointPath);
        }
    }

    @Test
    public void testTriggerSavepointTargetDirectory() throws Exception {
        TriggerId triggerId = new TriggerId();
        String expectedSubmittedSavepointDir = "world";
        String expectedReturnedSavepointDir = "hello";
        try (RestServerEndpoint restServerEndpoint = RestClusterClientSavepointTriggerTest.createRestServerEndpoint((FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException>)((FunctionWithException)triggerRequestBody -> {
            Assert.assertEquals((Object)"world", (Object)triggerRequestBody.getTargetDirectory());
            Assert.assertFalse((boolean)triggerRequestBody.isCancelJob());
            return triggerId;
        }), (FunctionWithException<TriggerId, SavepointInfo, RestHandlerException>)((FunctionWithException)statusRequestTriggerId -> {
            Assert.assertEquals((Object)triggerId, (Object)statusRequestTriggerId);
            return new SavepointInfo("hello", null);
        }));){
            RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
            String savepointPath = (String)restClusterClient.triggerSavepoint(new JobID(), "world").get();
            Assert.assertEquals((Object)"hello", (Object)savepointPath);
        }
    }

    @Test
    public void testTriggerSavepointCancelJob() throws Exception {
        TriggerId triggerId = new TriggerId();
        String expectedSavepointDir = "hello";
        try (RestServerEndpoint restServerEndpoint = RestClusterClientSavepointTriggerTest.createRestServerEndpoint((FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException>)((FunctionWithException)request -> {
            Assert.assertTrue((boolean)request.isCancelJob());
            return triggerId;
        }), (FunctionWithException<TriggerId, SavepointInfo, RestHandlerException>)((FunctionWithException)trigger -> {
            Assert.assertEquals((Object)triggerId, (Object)trigger);
            return new SavepointInfo("hello", null);
        }));){
            RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
            String savepointPath = (String)restClusterClient.cancelWithSavepoint(new JobID(), null).get();
            Assert.assertEquals((Object)"hello", (Object)savepointPath);
        }
    }

    @Test
    public void testTriggerSavepointFailure() throws Exception {
        TriggerId triggerId = new TriggerId();
        try (RestServerEndpoint restServerEndpoint = RestClusterClientSavepointTriggerTest.createRestServerEndpoint((FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException>)((FunctionWithException)request -> triggerId), (FunctionWithException<TriggerId, SavepointInfo, RestHandlerException>)((FunctionWithException)trigger -> new SavepointInfo(null, new SerializedThrowable((Throwable)new RuntimeException("expected")))));){
            RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
            try {
                restClusterClient.triggerSavepoint(new JobID(), null).get();
            }
            catch (ExecutionException e) {
                Throwable cause = e.getCause();
                Assert.assertThat((Object)cause, (Matcher)Matchers.instanceOf(SerializedThrowable.class));
                Assert.assertThat((Object)((SerializedThrowable)cause).deserializeError(ClassLoader.getSystemClassLoader()).getMessage(), (Matcher)Matchers.equalTo((Object)"expected"));
            }
        }
    }

    @Test
    public void testTriggerSavepointRetry() throws Exception {
        TriggerId triggerId = new TriggerId();
        String expectedSavepointDir = "hello";
        AtomicBoolean failRequest = new AtomicBoolean(true);
        try (RestServerEndpoint restServerEndpoint = RestClusterClientSavepointTriggerTest.createRestServerEndpoint((FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException>)((FunctionWithException)request -> triggerId), (FunctionWithException<TriggerId, SavepointInfo, RestHandlerException>)((FunctionWithException)trigger -> {
            if (failRequest.compareAndSet(true, false)) {
                throw new RestHandlerException("expected", HttpResponseStatus.SERVICE_UNAVAILABLE);
            }
            return new SavepointInfo("hello", null);
        }));){
            RestClusterClient<StandaloneClusterId> restClusterClient = this.createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
            String savepointPath = (String)restClusterClient.triggerSavepoint(new JobID(), null).get();
            Assert.assertEquals((Object)"hello", (Object)savepointPath);
        }
    }

    private static RestServerEndpoint createRestServerEndpoint(FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> triggerHandlerLogic, FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> savepointHandlerLogic) throws Exception {
        return TestRestServerEndpoint.builder((Configuration)REST_CONFIG).withHandler((AbstractRestHandler)new TestSavepointTriggerHandler(triggerHandlerLogic)).withHandler((AbstractRestHandler)new TestSavepointHandler(savepointHandlerLogic)).buildAndStart();
    }

    private RestClusterClient<StandaloneClusterId> createRestClusterClient(int port) throws Exception {
        Configuration clientConfig = new Configuration(REST_CONFIG);
        clientConfig.setInteger(RestOptions.PORT, port);
        return new RestClusterClient(clientConfig, new RestClient(REST_CONFIG, (Executor)executor), (Object)StandaloneClusterId.getInstance(), attempt -> 0L);
    }

    static {
        Configuration config = new Configuration();
        config.setString(JobManagerOptions.ADDRESS, "localhost");
        config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
        config.setLong(RestOptions.RETRY_DELAY, 0L);
        config.setInteger(RestOptions.PORT, 0);
        REST_CONFIG = new UnmodifiableConfiguration(config);
    }

    private static abstract class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters>
    extends AbstractRestHandler<DispatcherGateway, R, P, M> {
        private TestHandler(MessageHeaders<R, P, M> headers) {
            super(mockGatewayRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), headers);
        }
    }

    private static class TestSavepointHandler
    extends TestHandler<EmptyRequestBody, AsynchronousOperationResult<SavepointInfo>, SavepointStatusMessageParameters> {
        private final FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> savepointHandlerLogic;

        TestSavepointHandler(FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> savepointHandlerLogic) {
            super((MessageHeaders)SavepointStatusHeaders.getInstance());
            this.savepointHandlerLogic = savepointHandlerLogic;
        }

        protected CompletableFuture<AsynchronousOperationResult<SavepointInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointStatusMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            TriggerId triggerId = (TriggerId)request.getPathParameter(TriggerIdPathParameter.class);
            return CompletableFuture.completedFuture(AsynchronousOperationResult.completed((Object)this.savepointHandlerLogic.apply((Object)triggerId)));
        }
    }

    private static final class TestSavepointTriggerHandler
    extends TestHandler<SavepointTriggerRequestBody, TriggerResponse, SavepointTriggerMessageParameters> {
        private final FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> triggerHandlerLogic;

        TestSavepointTriggerHandler(FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> triggerHandlerLogic) {
            super((MessageHeaders)SavepointTriggerHeaders.getInstance());
            this.triggerHandlerLogic = triggerHandlerLogic;
        }

        protected CompletableFuture<TriggerResponse> handleRequest(@Nonnull HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            return CompletableFuture.completedFuture(new TriggerResponse((TriggerId)this.triggerHandlerLogic.apply((Object)request.getRequestBody())));
        }
    }
}

