/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.RegisterApplicationMasterResponseReflector;
import org.apache.flink.yarn.Utils;
import org.apache.flink.yarn.YarnTaskExecutorRunner;
import org.apache.flink.yarn.YarnWorkerNode;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

public class YarnResourceManager
extends ResourceManager<YarnWorkerNode>
implements AMRMClientAsync.CallbackHandler {
    private static final Priority RM_REQUEST_PRIORITY = Priority.newInstance((int)1);
    private final Map<String, String> env;
    private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
    private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
    static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
    static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
    private final int yarnHeartbeatIntervalMillis;
    private final Configuration flinkConfig;
    private final YarnConfiguration yarnConfig;
    @Nullable
    private final String webInterfaceUrl;
    private final int numberOfTaskSlots;
    private final int defaultTaskManagerMemoryMB;
    private final int defaultCpus;
    private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
    private NMClient nodeManagerClient;
    private int numPendingContainerRequests;
    private final Map<ResourceProfile, Integer> resourcePriorities = new HashMap<ResourceProfile, Integer>();
    private final Collection<ResourceProfile> slotsPerWorker;
    private final Resource resource;

    public YarnResourceManager(RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, Configuration flinkConfig, Map<String, String> env, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, @Nullable String webInterfaceUrl, JobManagerMetricGroup jobManagerMetricGroup) {
        super(rpcService, resourceManagerEndpointId, resourceId, highAvailabilityServices, heartbeatServices, slotManager, metricRegistry, jobLeaderIdService, clusterInformation, fatalErrorHandler, jobManagerMetricGroup);
        this.flinkConfig = flinkConfig;
        this.yarnConfig = new YarnConfiguration();
        this.env = env;
        this.workerNodeMap = new ConcurrentHashMap<ResourceID, YarnWorkerNode>();
        int yarnHeartbeatIntervalMS = flinkConfig.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
        long yarnExpiryIntervalMS = this.yarnConfig.getLong("yarn.am.liveness-monitor.expiry-interval-ms", 600000L);
        if ((long)yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
            this.log.warn("The heartbeat interval of the Flink Application master ({}) is greater than YARN's expiry interval ({}). The application is likely to be killed by YARN.", (Object)yarnHeartbeatIntervalMS, (Object)yarnExpiryIntervalMS);
        }
        this.yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
        this.numPendingContainerRequests = 0;
        this.webInterfaceUrl = webInterfaceUrl;
        this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
        this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory((Configuration)flinkConfig).getMebiBytes();
        this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, this.numberOfTaskSlots);
        this.resource = Resource.newInstance((int)this.defaultTaskManagerMemoryMB, (int)this.defaultCpus);
        this.slotsPerWorker = YarnResourceManager.createSlotsPerWorker((int)this.numberOfTaskSlots);
    }

    protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(YarnConfiguration yarnConfiguration, int yarnHeartbeatIntervalMillis, @Nullable String webInterfaceUrl) throws Exception {
        int lastColon;
        AMRMClientAsync resourceManagerClient = AMRMClientAsync.createAMRMClientAsync((int)yarnHeartbeatIntervalMillis, (AMRMClientAsync.CallbackHandler)this);
        resourceManagerClient.init((org.apache.hadoop.conf.Configuration)yarnConfiguration);
        resourceManagerClient.start();
        Tuple2<String, Integer> hostPort = YarnResourceManager.parseHostPort(this.getAddress());
        int restPort = webInterfaceUrl != null ? ((lastColon = webInterfaceUrl.lastIndexOf(58)) == -1 ? -1 : Integer.valueOf(webInterfaceUrl.substring(lastColon + 1))) : -1;
        RegisterApplicationMasterResponse registerApplicationMasterResponse = resourceManagerClient.registerApplicationMaster((String)hostPort.f0, restPort, webInterfaceUrl);
        this.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
        return resourceManagerClient;
    }

    private void getContainersFromPreviousAttempts(RegisterApplicationMasterResponse registerApplicationMasterResponse) {
        List<Container> containersFromPreviousAttempts = new RegisterApplicationMasterResponseReflector(this.log).getContainersFromPreviousAttempts(registerApplicationMasterResponse);
        this.log.info("Recovered {} containers from previous attempts ({}).", (Object)containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
        for (Container container : containersFromPreviousAttempts) {
            this.workerNodeMap.put(new ResourceID(container.getId().toString()), new YarnWorkerNode(container));
        }
    }

    protected NMClient createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
        NMClient nodeManagerClient = NMClient.createNMClient();
        nodeManagerClient.init((org.apache.hadoop.conf.Configuration)yarnConfiguration);
        nodeManagerClient.start();
        nodeManagerClient.cleanupRunningContainersOnStop(true);
        return nodeManagerClient;
    }

    protected void initialize() throws ResourceManagerException {
        try {
            this.resourceManagerClient = this.createAndStartResourceManagerClient(this.yarnConfig, this.yarnHeartbeatIntervalMillis, this.webInterfaceUrl);
        }
        catch (Exception e) {
            throw new ResourceManagerException("Could not start resource manager client.", (Throwable)e);
        }
        this.nodeManagerClient = this.createAndStartNodeManagerClient(this.yarnConfig);
    }

    public CompletableFuture<Void> onStop() {
        Throwable firstException = null;
        if (this.resourceManagerClient != null) {
            try {
                this.resourceManagerClient.stop();
            }
            catch (Throwable t) {
                firstException = t;
            }
        }
        if (this.nodeManagerClient != null) {
            try {
                this.nodeManagerClient.stop();
            }
            catch (Throwable t) {
                firstException = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)firstException);
            }
        }
        CompletableFuture terminationFuture = super.onStop();
        if (firstException != null) {
            return FutureUtils.completedExceptionally((Throwable)new FlinkException("Error while shutting down YARN resource manager", firstException));
        }
        return terminationFuture;
    }

    protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) {
        FinalApplicationStatus yarnStatus = this.getYarnStatus(finalStatus);
        this.log.info("Unregister application from the YARN Resource Manager with final status {}.", (Object)yarnStatus);
        try {
            this.resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, "");
        }
        catch (Throwable t) {
            this.log.error("Could not unregister the application master.", t);
        }
        Utils.deleteApplicationFiles(this.env);
    }

    public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
        Preconditions.checkArgument((boolean)ResourceProfile.UNKNOWN.equals((Object)resourceProfile), (Object)"The YarnResourceManager does not support custom ResourceProfiles yet. It assumes that all containers have the same resources.");
        this.requestYarnContainer();
        return this.slotsPerWorker;
    }

    @VisibleForTesting
    Resource getContainerResource() {
        return this.resource;
    }

    public boolean stopWorker(YarnWorkerNode workerNode) {
        Container container = workerNode.getContainer();
        this.log.info("Stopping container {}.", (Object)container.getId());
        try {
            this.nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
        }
        catch (Exception e) {
            this.log.warn("Error while calling YARN Node Manager to stop container", (Throwable)e);
        }
        this.resourceManagerClient.releaseAssignedContainer(container.getId());
        this.workerNodeMap.remove(workerNode.getResourceID());
        return true;
    }

    protected YarnWorkerNode workerStarted(ResourceID resourceID) {
        return (YarnWorkerNode)this.workerNodeMap.get(resourceID);
    }

    public float getProgress() {
        return 1.0f;
    }

    public void onContainersCompleted(List<ContainerStatus> statuses) {
        this.runAsync(() -> {
            this.log.debug("YARN ResourceManager reported the following containers completed: {}.", (Object)statuses);
            for (ContainerStatus containerStatus : statuses) {
                ResourceID resourceId = new ResourceID(containerStatus.getContainerId().toString());
                YarnWorkerNode yarnWorkerNode = (YarnWorkerNode)this.workerNodeMap.remove(resourceId);
                if (yarnWorkerNode != null) {
                    this.requestYarnContainerIfRequired();
                }
                this.closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
            }
        });
    }

    public void onContainersAllocated(List<Container> containers) {
        this.runAsync(() -> {
            Collection<AMRMClient.ContainerRequest> pendingRequests = this.getPendingRequests();
            Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();
            for (Container container : containers) {
                this.log.info("Received new container: {} - Remaining pending container requests: {}", (Object)container.getId(), (Object)this.numPendingContainerRequests);
                if (this.numPendingContainerRequests > 0) {
                    this.removeContainerRequest(pendingRequestsIterator.next());
                    String containerIdStr = container.getId().toString();
                    ResourceID resourceId = new ResourceID(containerIdStr);
                    this.workerNodeMap.put(resourceId, new YarnWorkerNode(container));
                    try {
                        ContainerLaunchContext taskExecutorLaunchContext = this.createTaskExecutorLaunchContext(container.getResource(), containerIdStr, container.getNodeId().getHost());
                        this.nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
                    }
                    catch (Throwable t) {
                        this.log.error("Could not start TaskManager in container {}.", (Object)container.getId(), (Object)t);
                        this.workerNodeMap.remove(resourceId);
                        this.resourceManagerClient.releaseAssignedContainer(container.getId());
                        this.requestYarnContainerIfRequired();
                    }
                    continue;
                }
                this.log.info("Returning excess container {}.", (Object)container.getId());
                this.resourceManagerClient.releaseAssignedContainer(container.getId());
            }
            if (this.numPendingContainerRequests <= 0) {
                this.resourceManagerClient.setHeartbeatInterval(this.yarnHeartbeatIntervalMillis);
            }
        });
    }

    private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) {
        --this.numPendingContainerRequests;
        this.log.info("Removing container request {}. Pending container requests {}.", (Object)pendingContainerRequest, (Object)this.numPendingContainerRequests);
        this.resourceManagerClient.removeContainerRequest(pendingContainerRequest);
    }

    private Collection<AMRMClient.ContainerRequest> getPendingRequests() {
        List<AMRMClient.ContainerRequest> matchingContainerRequests;
        List matchingRequests = this.resourceManagerClient.getMatchingRequests(RM_REQUEST_PRIORITY, "*", this.getContainerResource());
        if (matchingRequests.isEmpty()) {
            matchingContainerRequests = Collections.emptyList();
        } else {
            Collection collection = (Collection)matchingRequests.get(0);
            matchingContainerRequests = new ArrayList(collection);
        }
        Preconditions.checkState((matchingContainerRequests.size() == this.numPendingContainerRequests ? 1 : 0) != 0, (String)"The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged. Number client's pending container requests %s != Number RM's pending container requests %s.", (Object[])new Object[]{matchingContainerRequests.size(), this.numPendingContainerRequests});
        return matchingContainerRequests;
    }

    public void onShutdownRequest() {
        this.closeAsync();
    }

    public void onNodesUpdated(List<NodeReport> list) {
    }

    public void onError(Throwable error) {
        this.onFatalError(error);
    }

    private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
        if (status == null) {
            return FinalApplicationStatus.UNDEFINED;
        }
        switch (status) {
            case SUCCEEDED: {
                return FinalApplicationStatus.SUCCEEDED;
            }
            case FAILED: {
                return FinalApplicationStatus.FAILED;
            }
            case CANCELED: {
                return FinalApplicationStatus.KILLED;
            }
        }
        return FinalApplicationStatus.UNDEFINED;
    }

    private static Tuple2<String, Integer> parseHostPort(String address) {
        String[] hostPort = address.split("@")[1].split(":");
        String host = hostPort[0];
        String port = hostPort[1].split("/")[0];
        return new Tuple2((Object)host, (Object)Integer.valueOf(port));
    }

    private void requestYarnContainerIfRequired() {
        int pendingTaskManagerSlots;
        int requiredTaskManagerSlots = this.getNumberRequiredTaskManagerSlots();
        if (requiredTaskManagerSlots > (pendingTaskManagerSlots = this.numPendingContainerRequests * this.numberOfTaskSlots)) {
            this.requestYarnContainer();
        }
    }

    private void requestYarnContainer() {
        this.resourceManagerClient.addContainerRequest(this.getContainerRequest());
        this.resourceManagerClient.setHeartbeatInterval(500);
        ++this.numPendingContainerRequests;
        this.log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.", (Object)this.resource, (Object)this.numPendingContainerRequests);
    }

    @Nonnull
    @VisibleForTesting
    AMRMClient.ContainerRequest getContainerRequest() {
        return new AMRMClient.ContainerRequest(this.getContainerResource(), null, null, RM_REQUEST_PRIORITY);
    }

    private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host) throws Exception {
        String currDir = this.env.get(ApplicationConstants.Environment.PWD.key());
        ContaineredTaskManagerParameters taskManagerParameters = ContaineredTaskManagerParameters.create((Configuration)this.flinkConfig, (long)resource.getMemory(), (int)this.numberOfTaskSlots);
        this.log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, JVM direct memory limit {} MB", new Object[]{containerId, taskManagerParameters.taskManagerTotalMemoryMB(), taskManagerParameters.taskManagerHeapSizeMB(), taskManagerParameters.taskManagerDirectMemoryLimitMB()});
        Configuration taskManagerConfig = BootstrapTools.cloneConfiguration((Configuration)this.flinkConfig);
        this.log.debug("TaskManager configuration: {}", (Object)taskManagerConfig);
        ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(this.flinkConfig, this.yarnConfig, this.env, taskManagerParameters, taskManagerConfig, currDir, YarnTaskExecutorRunner.class, this.log);
        taskExecutorLaunchContext.getEnvironment().put(ENV_FLINK_CONTAINER_ID, containerId);
        taskExecutorLaunchContext.getEnvironment().put(ENV_FLINK_NODE_ID, host);
        return taskExecutorLaunchContext;
    }
}

