package org.apache.spark.network.shuffle;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Function;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.BlocksRemoved;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FetchShuffleBlocks;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors;
import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
import org.apache.spark.network.shuffle.protocol.RemoveBlocks;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sparkproject.guava.annotations.VisibleForTesting;

/* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockHandler.class */
public class ExternalBlockHandler extends RpcHandler {
    private static final Logger logger = LoggerFactory.getLogger(ExternalBlockHandler.class);

    @VisibleForTesting
    final ExternalShuffleBlockResolver blockManager;
    private final OneForOneStreamManager streamManager;
    private final ShuffleMetrics metrics;
    private final MergedShuffleFileManager mergeManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockHandler$ManagedBufferIterator.class */
    public class ManagedBufferIterator implements Iterator<ManagedBuffer> {
        private int index = 0;
        private final Function<Integer, ManagedBuffer> blockDataForIndexFn;
        private final int size;

        ManagedBufferIterator(OpenBlocks openBlocks) {
            String str = openBlocks.appId;
            String str2 = openBlocks.execId;
            String[] strArr = openBlocks.blockIds;
            String[] split = strArr[0].split("_");
            if (split.length == 4 && split[0].equals("shuffle")) {
                int parseInt = Integer.parseInt(split[1]);
                int[] shuffleMapIdAndReduceIds = shuffleMapIdAndReduceIds(strArr, parseInt);
                this.size = shuffleMapIdAndReduceIds.length;
                this.blockDataForIndexFn = num -> {
                    return ExternalBlockHandler.this.blockManager.getBlockData(str, str2, parseInt, shuffleMapIdAndReduceIds[num.intValue()], shuffleMapIdAndReduceIds[num.intValue() + 1]);
                };
                return;
            }
            if (split.length != 3 || !split[0].equals("rdd")) {
                throw new IllegalArgumentException("Unexpected block id format: " + strArr[0]);
            }
            int[] rddAndSplitIds = rddAndSplitIds(strArr);
            this.size = rddAndSplitIds.length;
            this.blockDataForIndexFn = num2 -> {
                return ExternalBlockHandler.this.blockManager.getRddBlockData(str, str2, rddAndSplitIds[num2.intValue()], rddAndSplitIds[num2.intValue() + 1]);
            };
        }

        private int[] rddAndSplitIds(String[] strArr) {
            int[] iArr = new int[2 * strArr.length];
            for (int i = 0; i < strArr.length; i++) {
                String[] split = strArr[i].split("_");
                if (split.length != 3 || !split[0].equals("rdd")) {
                    throw new IllegalArgumentException("Unexpected RDD block id format: " + strArr[i]);
                }
                iArr[2 * i] = Integer.parseInt(split[1]);
                iArr[(2 * i) + 1] = Integer.parseInt(split[2]);
            }
            return iArr;
        }

        private int[] shuffleMapIdAndReduceIds(String[] strArr, int i) {
            int[] iArr = new int[2 * strArr.length];
            for (int i2 = 0; i2 < strArr.length; i2++) {
                String[] split = strArr[i2].split("_");
                if (split.length != 4 || !split[0].equals("shuffle")) {
                    throw new IllegalArgumentException("Unexpected shuffle block id format: " + strArr[i2]);
                }
                if (Integer.parseInt(split[1]) != i) {
                    throw new IllegalArgumentException("Expected shuffleId=" + i + ", got:" + strArr[i2]);
                }
                iArr[2 * i2] = Integer.parseInt(split[2]);
                iArr[(2 * i2) + 1] = Integer.parseInt(split[3]);
            }
            return iArr;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.index < this.size;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public ManagedBuffer next() {
            ManagedBuffer apply = this.blockDataForIndexFn.apply(Integer.valueOf(this.index));
            this.index += 2;
            ExternalBlockHandler.this.metrics.blockTransferRateBytes.mark(apply != null ? apply.size() : 0L);
            return apply;
        }
    }

    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockHandler$NoOpMergedShuffleFileManager.class */
    public static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
        public NoOpMergedShuffleFileManager(TransportConf transportConf) {
        }

        @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
        public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream pushBlockStream) {
            throw new UnsupportedOperationException("Cannot handle shuffle block merge");
        }

        @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
        public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge finalizeShuffleMerge) throws IOException {
            throw new UnsupportedOperationException("Cannot handle shuffle block merge");
        }

        @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
        public void registerExecutor(String str, ExecutorShuffleInfo executorShuffleInfo) {
        }

        @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
        public void applicationRemoved(String str, boolean z) {
        }

        @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
        public ManagedBuffer getMergedBlockData(String str, int i, int i2, int i3) {
            throw new UnsupportedOperationException("Cannot handle shuffle block merge");
        }

        @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
        public MergedBlockMeta getMergedBlockMeta(String str, int i, int i2) {
            throw new UnsupportedOperationException("Cannot handle shuffle block merge");
        }

        @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
        public String[] getMergedBlockDirs(String str) {
            throw new UnsupportedOperationException("Cannot handle shuffle block merge");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockHandler$ShuffleManagedBufferIterator.class */
    public class ShuffleManagedBufferIterator implements Iterator<ManagedBuffer> {
        private int mapIdx = 0;
        private int reduceIdx = 0;
        private final String appId;
        private final String execId;
        private final int shuffleId;
        private final long[] mapIds;
        private final int[][] reduceIds;
        private final boolean batchFetchEnabled;
        static final /* synthetic */ boolean $assertionsDisabled;

        ShuffleManagedBufferIterator(FetchShuffleBlocks fetchShuffleBlocks) {
            this.appId = fetchShuffleBlocks.appId;
            this.execId = fetchShuffleBlocks.execId;
            this.shuffleId = fetchShuffleBlocks.shuffleId;
            this.mapIds = fetchShuffleBlocks.mapIds;
            this.reduceIds = fetchShuffleBlocks.reduceIds;
            this.batchFetchEnabled = fetchShuffleBlocks.batchFetchEnabled;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if ($assertionsDisabled || (this.mapIds.length != 0 && this.mapIds.length == this.reduceIds.length)) {
                return this.mapIdx < this.mapIds.length && this.reduceIdx < this.reduceIds[this.mapIdx].length;
            }
            throw new AssertionError();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public ManagedBuffer next() {
            ManagedBuffer continuousBlocksData;
            if (!this.batchFetchEnabled) {
                continuousBlocksData = ExternalBlockHandler.this.blockManager.getBlockData(this.appId, this.execId, this.shuffleId, this.mapIds[this.mapIdx], this.reduceIds[this.mapIdx][this.reduceIdx]);
                if (this.reduceIdx < this.reduceIds[this.mapIdx].length - 1) {
                    this.reduceIdx++;
                } else {
                    this.reduceIdx = 0;
                    this.mapIdx++;
                }
            } else {
                if (!$assertionsDisabled && this.reduceIds[this.mapIdx].length != 2) {
                    throw new AssertionError();
                }
                continuousBlocksData = ExternalBlockHandler.this.blockManager.getContinuousBlocksData(this.appId, this.execId, this.shuffleId, this.mapIds[this.mapIdx], this.reduceIds[this.mapIdx][0], this.reduceIds[this.mapIdx][1]);
                this.mapIdx++;
            }
            ExternalBlockHandler.this.metrics.blockTransferRateBytes.mark(continuousBlocksData != null ? continuousBlocksData.size() : 0L);
            return continuousBlocksData;
        }

        static {
            $assertionsDisabled = !ExternalBlockHandler.class.desiredAssertionStatus();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockHandler$ShuffleMetrics.class */
    public class ShuffleMetrics implements MetricSet {
        private final Timer openBlockRequestLatencyMillis = new Timer();
        private final Timer registerExecutorRequestLatencyMillis = new Timer();
        private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
        private final Meter blockTransferRateBytes = new Meter();
        private Counter activeConnections = new Counter();
        private Counter caughtExceptions = new Counter();
        private final Map<String, Metric> allMetrics = new HashMap();

        public ShuffleMetrics() {
            this.allMetrics.put("openBlockRequestLatencyMillis", this.openBlockRequestLatencyMillis);
            this.allMetrics.put("registerExecutorRequestLatencyMillis", this.registerExecutorRequestLatencyMillis);
            this.allMetrics.put("finalizeShuffleMergeLatencyMillis", this.finalizeShuffleMergeLatencyMillis);
            this.allMetrics.put("blockTransferRateBytes", this.blockTransferRateBytes);
            this.allMetrics.put("registeredExecutorsSize", () -> {
                return Integer.valueOf(ExternalBlockHandler.this.blockManager.getRegisteredExecutorsSize());
            });
            this.allMetrics.put("numActiveConnections", this.activeConnections);
            this.allMetrics.put("numCaughtExceptions", this.caughtExceptions);
        }

        public Map<String, Metric> getMetrics() {
            return this.allMetrics;
        }
    }

    public ExternalBlockHandler(TransportConf transportConf, File file) throws IOException {
        this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(transportConf, file), new NoOpMergedShuffleFileManager(transportConf));
    }

    public ExternalBlockHandler(TransportConf transportConf, File file, MergedShuffleFileManager mergedShuffleFileManager) throws IOException {
        this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(transportConf, file), mergedShuffleFileManager);
    }

    @VisibleForTesting
    public ExternalShuffleBlockResolver getBlockResolver() {
        return this.blockManager;
    }

    @VisibleForTesting
    public ExternalBlockHandler(OneForOneStreamManager oneForOneStreamManager, ExternalShuffleBlockResolver externalShuffleBlockResolver) {
        this(oneForOneStreamManager, externalShuffleBlockResolver, new NoOpMergedShuffleFileManager(null));
    }

    @VisibleForTesting
    public ExternalBlockHandler(OneForOneStreamManager oneForOneStreamManager, ExternalShuffleBlockResolver externalShuffleBlockResolver, MergedShuffleFileManager mergedShuffleFileManager) {
        this.metrics = new ShuffleMetrics();
        this.streamManager = oneForOneStreamManager;
        this.blockManager = externalShuffleBlockResolver;
        this.mergeManager = mergedShuffleFileManager;
    }

    public void receive(TransportClient transportClient, ByteBuffer byteBuffer, RpcResponseCallback rpcResponseCallback) {
        handleMessage(BlockTransferMessage.Decoder.fromByteBuffer(byteBuffer), transportClient, rpcResponseCallback);
    }

    public StreamCallbackWithID receiveStream(TransportClient transportClient, ByteBuffer byteBuffer, RpcResponseCallback rpcResponseCallback) {
        BlockTransferMessage fromByteBuffer = BlockTransferMessage.Decoder.fromByteBuffer(byteBuffer);
        if (!(fromByteBuffer instanceof PushBlockStream)) {
            throw new UnsupportedOperationException("Unexpected message with #receiveStream: " + fromByteBuffer);
        }
        PushBlockStream pushBlockStream = (PushBlockStream) fromByteBuffer;
        checkAuth(transportClient, pushBlockStream.appId);
        return this.mergeManager.receiveBlockDataAsStream(pushBlockStream);
    }

    protected void handleMessage(BlockTransferMessage blockTransferMessage, TransportClient transportClient, RpcResponseCallback rpcResponseCallback) {
        Timer.Context time;
        int length;
        long registerStream;
        if ((blockTransferMessage instanceof FetchShuffleBlocks) || (blockTransferMessage instanceof OpenBlocks)) {
            time = this.metrics.openBlockRequestLatencyMillis.time();
            try {
                if (blockTransferMessage instanceof FetchShuffleBlocks) {
                    FetchShuffleBlocks fetchShuffleBlocks = (FetchShuffleBlocks) blockTransferMessage;
                    checkAuth(transportClient, fetchShuffleBlocks.appId);
                    length = 0;
                    if (fetchShuffleBlocks.batchFetchEnabled) {
                        length = fetchShuffleBlocks.mapIds.length;
                    } else {
                        for (int[] iArr : fetchShuffleBlocks.reduceIds) {
                            length += iArr.length;
                        }
                    }
                    registerStream = this.streamManager.registerStream(transportClient.getClientId(), new ShuffleManagedBufferIterator(fetchShuffleBlocks), transportClient.getChannel());
                } else {
                    OpenBlocks openBlocks = (OpenBlocks) blockTransferMessage;
                    length = openBlocks.blockIds.length;
                    checkAuth(transportClient, openBlocks.appId);
                    registerStream = this.streamManager.registerStream(transportClient.getClientId(), new ManagedBufferIterator(openBlocks), transportClient.getChannel());
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("Registered streamId {} with {} buffers for client {} from host {}", new Object[]{Long.valueOf(registerStream), Integer.valueOf(length), transportClient.getClientId(), NettyUtils.getRemoteAddress(transportClient.getChannel())});
                }
                rpcResponseCallback.onSuccess(new StreamHandle(registerStream, length).toByteBuffer());
                time.stop();
                return;
            } finally {
                time.stop();
            }
        }
        if (blockTransferMessage instanceof RegisterExecutor) {
            time = this.metrics.registerExecutorRequestLatencyMillis.time();
            try {
                RegisterExecutor registerExecutor = (RegisterExecutor) blockTransferMessage;
                checkAuth(transportClient, registerExecutor.appId);
                this.blockManager.registerExecutor(registerExecutor.appId, registerExecutor.execId, registerExecutor.executorInfo);
                this.mergeManager.registerExecutor(registerExecutor.appId, registerExecutor.executorInfo);
                rpcResponseCallback.onSuccess(ByteBuffer.wrap(new byte[0]));
                time.stop();
                return;
            } finally {
            }
        }
        if (blockTransferMessage instanceof RemoveBlocks) {
            RemoveBlocks removeBlocks = (RemoveBlocks) blockTransferMessage;
            checkAuth(transportClient, removeBlocks.appId);
            rpcResponseCallback.onSuccess(new BlocksRemoved(this.blockManager.removeBlocks(removeBlocks.appId, removeBlocks.execId, removeBlocks.blockIds)).toByteBuffer());
            return;
        }
        if (blockTransferMessage instanceof GetLocalDirsForExecutors) {
            GetLocalDirsForExecutors getLocalDirsForExecutors = (GetLocalDirsForExecutors) blockTransferMessage;
            checkAuth(transportClient, getLocalDirsForExecutors.appId);
            rpcResponseCallback.onSuccess(new LocalDirsForExecutors(this.blockManager.getLocalDirs(getLocalDirsForExecutors.appId, getLocalDirsForExecutors.execIds)).toByteBuffer());
        } else {
            if (!(blockTransferMessage instanceof FinalizeShuffleMerge)) {
                throw new UnsupportedOperationException("Unexpected message: " + blockTransferMessage);
            }
            Timer.Context time2 = this.metrics.finalizeShuffleMergeLatencyMillis.time();
            FinalizeShuffleMerge finalizeShuffleMerge = (FinalizeShuffleMerge) blockTransferMessage;
            try {
                try {
                    checkAuth(transportClient, finalizeShuffleMerge.appId);
                    rpcResponseCallback.onSuccess(this.mergeManager.finalizeShuffleMerge(finalizeShuffleMerge).toByteBuffer());
                    time2.stop();
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeException(String.format("Error while finalizing shuffle merge for application %s shuffle %d", finalizeShuffleMerge.appId, Integer.valueOf(finalizeShuffleMerge.shuffleId)), e);
            }
        }
    }

    public void exceptionCaught(Throwable th, TransportClient transportClient) {
        this.metrics.caughtExceptions.inc();
    }

    public MetricSet getAllMetrics() {
        return this.metrics;
    }

    public StreamManager getStreamManager() {
        return this.streamManager;
    }

    public void applicationRemoved(String str, boolean z) {
        this.blockManager.applicationRemoved(str, z);
        this.mergeManager.applicationRemoved(str, z);
    }

    public void executorRemoved(String str, String str2) {
        this.blockManager.executorRemoved(str, str2);
    }

    public void reregisterExecutor(ExternalShuffleBlockResolver.AppExecId appExecId, ExecutorShuffleInfo executorShuffleInfo) {
        this.blockManager.registerExecutor(appExecId.appId, appExecId.execId, executorShuffleInfo);
    }

    public void close() {
        this.blockManager.close();
    }

    private void checkAuth(TransportClient transportClient, String str) {
        if (transportClient.getClientId() != null && !transportClient.getClientId().equals(str)) {
            throw new SecurityException(String.format("Client for %s not authorized for application %s.", transportClient.getClientId(), str));
        }
    }

    public void channelActive(TransportClient transportClient) {
        this.metrics.activeConnections.inc();
        super.channelActive(transportClient);
    }

    public void channelInactive(TransportClient transportClient) {
        this.metrics.activeConnections.dec();
        super.channelInactive(transportClient);
    }
}
