package org.apache.spark.network;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.protocol.StreamRequest;
import org.apache.spark.network.protocol.StreamResponse;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.TransportRequestHandler;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/spark/network/TransportRequestHandlerSuite.class */
public class TransportRequestHandlerSuite {
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/spark/network/TransportRequestHandlerSuite$ExtendedChannelPromise.class */
    private class ExtendedChannelPromise extends DefaultChannelPromise {
        private List<GenericFutureListener<Future<Void>>> listeners;
        private boolean success;

        ExtendedChannelPromise(Channel channel) {
            super(channel);
            this.listeners = new ArrayList();
            this.success = false;
        }

        public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> genericFutureListener) {
            this.listeners.add(genericFutureListener);
            return super.addListener(genericFutureListener);
        }

        public boolean isSuccess() {
            return this.success;
        }

        public void finish(boolean z) {
            this.success = z;
            this.listeners.forEach(genericFutureListener -> {
                try {
                    genericFutureListener.operationComplete(this);
                } catch (Exception e) {
                }
            });
        }

        /* renamed from: addListener, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ ChannelFuture m4addListener(GenericFutureListener genericFutureListener) {
            return addListener((GenericFutureListener<? extends Future<? super Void>>) genericFutureListener);
        }

        /* renamed from: addListener, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Future m5addListener(GenericFutureListener genericFutureListener) {
            return addListener((GenericFutureListener<? extends Future<? super Void>>) genericFutureListener);
        }

        /* renamed from: addListener, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Promise m6addListener(GenericFutureListener genericFutureListener) {
            return addListener((GenericFutureListener<? extends Future<? super Void>>) genericFutureListener);
        }
    }

    @Test
    public void handleFetchRequestAndStreamRequest() throws Exception {
        NoOpRpcHandler noOpRpcHandler = new NoOpRpcHandler();
        OneForOneStreamManager streamManager = noOpRpcHandler.getStreamManager();
        Channel channel = (Channel) Mockito.mock(Channel.class);
        ArrayList arrayList = new ArrayList();
        Mockito.when(channel.writeAndFlush(Mockito.any())).thenAnswer(invocationOnMock -> {
            Object obj = invocationOnMock.getArguments()[0];
            ExtendedChannelPromise extendedChannelPromise = new ExtendedChannelPromise(channel);
            arrayList.add(ImmutablePair.of(obj, extendedChannelPromise));
            return extendedChannelPromise;
        });
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new TestManagedBuffer(10));
        arrayList2.add(new TestManagedBuffer(20));
        arrayList2.add(new TestManagedBuffer(30));
        arrayList2.add(new TestManagedBuffer(40));
        long registerStream = streamManager.registerStream("test-app", arrayList2.iterator());
        streamManager.registerChannel(channel, registerStream);
        TransportRequestHandler transportRequestHandler = new TransportRequestHandler(channel, (TransportClient) Mockito.mock(TransportClient.class), noOpRpcHandler, 2L);
        transportRequestHandler.handle(new ChunkFetchRequest(new StreamChunkId(registerStream, 0)));
        if (!$assertionsDisabled && arrayList.size() != 1) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !(((Pair) arrayList.get(0)).getLeft() instanceof ChunkFetchSuccess)) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && ((ChunkFetchSuccess) ((Pair) arrayList.get(0)).getLeft()).body() != arrayList2.get(0)) {
            throw new AssertionError();
        }
        transportRequestHandler.handle(new ChunkFetchRequest(new StreamChunkId(registerStream, 1)));
        if (!$assertionsDisabled && arrayList.size() != 2) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !(((Pair) arrayList.get(1)).getLeft() instanceof ChunkFetchSuccess)) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && ((ChunkFetchSuccess) ((Pair) arrayList.get(1)).getLeft()).body() != arrayList2.get(1)) {
            throw new AssertionError();
        }
        ((ExtendedChannelPromise) ((Pair) arrayList.get(0)).getRight()).finish(true);
        transportRequestHandler.handle(new StreamRequest(String.format("%d_%d", Long.valueOf(registerStream), 2)));
        if (!$assertionsDisabled && arrayList.size() != 3) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !(((Pair) arrayList.get(2)).getLeft() instanceof StreamResponse)) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && ((StreamResponse) ((Pair) arrayList.get(2)).getLeft()).body() != arrayList2.get(2)) {
            throw new AssertionError();
        }
        transportRequestHandler.handle(new StreamRequest(String.format("%d_%d", Long.valueOf(registerStream), 3)));
        ((Channel) Mockito.verify(channel, Mockito.times(1))).close();
        if (!$assertionsDisabled && arrayList.size() != 3) {
            throw new AssertionError();
        }
    }

    static {
        $assertionsDisabled = !TransportRequestHandlerSuite.class.desiredAssertionStatus();
    }
}
