package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.NoOpTaskExecutorBlobService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.testutils.TestFileUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.collection.IsCollectionWithSize;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.class */
public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogger {
    private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    private final SettableLeaderRetrievalService jobManagerLeaderRetriever = new SettableLeaderRetrievalService();
    private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
    private final JobID jobId = new JobID();

    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();

    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();
    private static final Time timeout = Time.seconds(10);

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    @ClassRule
    public static final TestingRpcServiceResource RPC_SERVICE_RESOURCE = new TestingRpcServiceResource();

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest$TestingInvokable.class */
    public static class TestingInvokable extends AbstractInvokable {
        static BlockerSync sync;

        public TestingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            sync.block();
        }
    }

    @Before
    public void setup() {
        this.haServices.setResourceManagerLeaderRetriever(this.resourceManagerLeaderRetriever);
        this.haServices.setJobMasterLeaderRetriever(this.jobId, this.jobManagerLeaderRetriever);
    }

    @After
    public void shutdown() {
        RPC_SERVICE_RESOURCE.getTestingRpcService().clearGateways();
    }

    @Test
    public void testDeployedExecutionReporting() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
        CompletableFuture completableFuture = new CompletableFuture();
        ResourceID generate = ResourceID.generate();
        TestingJobMasterGateway testingJobMasterGateway = setupJobManagerGateway(oneShotLatch, arrayBlockingQueue, completableFuture, generate);
        CompletableFuture<SlotReport> completableFuture2 = new CompletableFuture<>();
        ResourceManagerGateway resourceManagerGateway = setupResourceManagerGateway(completableFuture2);
        TaskManagerServices build = new TaskManagerServicesBuilder().setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1, timeout, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor())).setShuffleEnvironment(new NettyShuffleEnvironmentBuilder().build()).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, Reference.owned(new File[]{this.tmp.newFolder()}), Executors.directExecutor())).build();
        TestingTaskExecutor createTestingTaskExecutor = createTestingTaskExecutor(build);
        try {
            createTestingTaskExecutor.start();
            createTestingTaskExecutor.waitUntilStarted();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) createTestingTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
            TaskDeploymentDescriptor createTaskDeploymentDescriptor = createTaskDeploymentDescriptor(this.jobId);
            connectComponentsAndRequestSlot(testingJobMasterGateway, resourceManagerGateway, taskExecutorGateway, build.getJobLeaderService(), completableFuture2, createTaskDeploymentDescriptor.getAllocationId());
            TestingInvokable.sync = new BlockerSync();
            oneShotLatch.await();
            AllocatedSlotReport allocatedSlotReport = new AllocatedSlotReport(this.jobId, Collections.singleton(new AllocatedSlotInfo(0, createTaskDeploymentDescriptor.getAllocationId())));
            taskExecutorGateway.heartbeatFromJobManager(generate, allocatedSlotReport);
            Assert.assertThat(arrayBlockingQueue.take(), IsCollectionWithSize.hasSize(0));
            taskExecutorGateway.submitTask(createTaskDeploymentDescriptor, testingJobMasterGateway.m250getFencingToken(), timeout).get();
            TestingInvokable.sync.awaitBlocker();
            taskExecutorGateway.heartbeatFromJobManager(generate, allocatedSlotReport);
            Assert.assertThat(arrayBlockingQueue.take(), IsCollectionContaining.hasItem(createTaskDeploymentDescriptor.getExecutionAttemptId()));
            TestingInvokable.sync.releaseBlocker();
            completableFuture.get();
            taskExecutorGateway.heartbeatFromJobManager(generate, allocatedSlotReport);
            Assert.assertThat(arrayBlockingQueue.take(), IsCollectionWithSize.hasSize(0));
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{createTestingTaskExecutor});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{createTestingTaskExecutor});
            throw th;
        }
    }

    private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices) throws IOException {
        Configuration configuration = new Configuration();
        return new TestingTaskExecutor(RPC_SERVICE_RESOURCE.getTestingRpcService(), TaskManagerConfiguration.fromConfiguration(configuration, TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(configuration), InetAddress.getLoopbackAddress().getHostAddress(), TestFileUtils.createTempDir()), this.haServices, taskManagerServices, ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, new HeartbeatServicesImpl(1000L, 30000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, NoOpTaskExecutorBlobService.INSTANCE, this.testingFatalErrorHandlerResource.getFatalErrorHandler(), new TestingTaskExecutorPartitionTracker(), new DelegationTokenReceiverRepository(configuration, (PluginManager) null));
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobID) throws IOException {
        return TaskDeploymentDescriptorBuilder.newBuilder(jobID, TestingInvokable.class).build();
    }

    private static TestingJobMasterGateway setupJobManagerGateway(OneShotLatch oneShotLatch, BlockingQueue<Set<ExecutionAttemptID>> blockingQueue, CompletableFuture<Void> completableFuture, ResourceID resourceID) {
        return new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((jobID, taskManagerRegistrationInformation) -> {
            return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(resourceID));
        }).setOfferSlotsFunction((resourceID2, collection) -> {
            oneShotLatch.trigger();
            return CompletableFuture.completedFuture(collection);
        }).setTaskManagerHeartbeatFunction((resourceID3, taskExecutorToJobManagerHeartbeatPayload) -> {
            blockingQueue.add(taskExecutorToJobManagerHeartbeatPayload.getExecutionDeploymentReport().getExecutions());
            return FutureUtils.completedVoidFuture();
        }).setUpdateTaskExecutionStateFunction(taskExecutionState -> {
            if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) {
                completableFuture.complete(null);
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
    }

    private static TestingResourceManagerGateway setupResourceManagerGateway(CompletableFuture<SlotReport> completableFuture) {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture.complete(tuple3.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(taskExecutorRegistration -> {
            return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), new ClusterInformation("blobServerHost", 55555), (byte[]) null));
        });
        return testingResourceManagerGateway;
    }

    private void connectComponentsAndRequestSlot(JobMasterGateway jobMasterGateway, ResourceManagerGateway resourceManagerGateway, TaskExecutorGateway taskExecutorGateway, JobLeaderService jobLeaderService, CompletableFuture<SlotReport> completableFuture, AllocationID allocationID) throws Exception {
        RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway("jm", jobMasterGateway);
        RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
        jobLeaderService.addJob(this.jobId, "jm");
        this.jobManagerLeaderRetriever.notifyListener("jm", UUID.randomUUID());
        this.resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
        Optional findAny = StreamSupport.stream(completableFuture.get().spliterator(), false).findAny();
        Assert.assertTrue(findAny.isPresent());
        taskExecutorGateway.requestSlot(((SlotStatus) findAny.get()).getSlotID(), this.jobId, allocationID, ResourceProfile.ZERO, "jm", resourceManagerGateway.getFencingToken(), timeout).get();
    }
}
