/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.shuffle;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import io.netty.channel.Channel;
import java.nio.ByteBuffer;
import java.util.Iterator;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.shuffle.protocol.UploadBlock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ExternalShuffleBlockHandlerSuite {
    TransportClient client = (TransportClient)Mockito.mock(TransportClient.class);
    OneForOneStreamManager streamManager;
    ExternalShuffleBlockResolver blockResolver;
    RpcHandler handler;

    @Before
    public void beforeEach() {
        this.streamManager = (OneForOneStreamManager)Mockito.mock(OneForOneStreamManager.class);
        this.blockResolver = (ExternalShuffleBlockResolver)Mockito.mock(ExternalShuffleBlockResolver.class);
        this.handler = new ExternalShuffleBlockHandler(this.streamManager, this.blockResolver);
    }

    @Test
    public void testRegisterExecutor() {
        RpcResponseCallback callback = (RpcResponseCallback)Mockito.mock(RpcResponseCallback.class);
        ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[]{"/a", "/b"}, 16, "sort");
        ByteBuffer registerMessage = new RegisterExecutor("app0", "exec1", config).toByteBuffer();
        this.handler.receive(this.client, registerMessage, callback);
        ((ExternalShuffleBlockResolver)Mockito.verify((Object)this.blockResolver, (VerificationMode)Mockito.times((int)1))).registerExecutor("app0", "exec1", config);
        ((RpcResponseCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).onSuccess((ByteBuffer)Matchers.any(ByteBuffer.class));
        ((RpcResponseCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.never())).onFailure((Throwable)Matchers.any(Throwable.class));
        Timer registerExecutorRequestLatencyMillis = (Timer)((ExternalShuffleBlockHandler)this.handler).getAllMetrics().getMetrics().get("registerExecutorRequestLatencyMillis");
        Assert.assertEquals((long)1L, (long)registerExecutorRequestLatencyMillis.getCount());
    }

    @Test
    public void testOpenShuffleBlocks() {
        RpcResponseCallback callback = (RpcResponseCallback)Mockito.mock(RpcResponseCallback.class);
        NioManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
        NioManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
        Mockito.when((Object)this.blockResolver.getBlockData("app0", "exec1", 0, 0, 0)).thenReturn((Object)block0Marker);
        Mockito.when((Object)this.blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn((Object)block1Marker);
        ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[]{"shuffle_0_0_0", "shuffle_0_0_1"}).toByteBuffer();
        this.handler.receive(this.client, openBlocks, callback);
        ArgumentCaptor response = ArgumentCaptor.forClass(ByteBuffer.class);
        ((RpcResponseCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).onSuccess((ByteBuffer)response.capture());
        ((RpcResponseCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.never())).onFailure((Throwable)Matchers.any());
        StreamHandle handle = (StreamHandle)BlockTransferMessage.Decoder.fromByteBuffer((ByteBuffer)((ByteBuffer)response.getValue()));
        Assert.assertEquals((long)2L, (long)handle.numChunks);
        ArgumentCaptor stream = ArgumentCaptor.forClass(Iterator.class);
        ((OneForOneStreamManager)Mockito.verify((Object)this.streamManager, (VerificationMode)Mockito.times((int)1))).registerStream(Mockito.anyString(), (Iterator)stream.capture(), (Channel)Matchers.any());
        Iterator buffers = (Iterator)stream.getValue();
        Assert.assertEquals((Object)block0Marker, buffers.next());
        Assert.assertEquals((Object)block1Marker, buffers.next());
        Assert.assertFalse((boolean)buffers.hasNext());
        ((ExternalShuffleBlockResolver)Mockito.verify((Object)this.blockResolver, (VerificationMode)Mockito.times((int)1))).getBlockData("app0", "exec1", 0, 0, 0);
        ((ExternalShuffleBlockResolver)Mockito.verify((Object)this.blockResolver, (VerificationMode)Mockito.times((int)1))).getBlockData("app0", "exec1", 0, 0, 1);
        Timer openBlockRequestLatencyMillis = (Timer)((ExternalShuffleBlockHandler)this.handler).getAllMetrics().getMetrics().get("openBlockRequestLatencyMillis");
        Assert.assertEquals((long)1L, (long)openBlockRequestLatencyMillis.getCount());
        Meter blockTransferRateBytes = (Meter)((ExternalShuffleBlockHandler)this.handler).getAllMetrics().getMetrics().get("blockTransferRateBytes");
        Assert.assertEquals((long)10L, (long)blockTransferRateBytes.getCount());
    }

    @Test
    public void testBadMessages() {
        RpcResponseCallback callback = (RpcResponseCallback)Mockito.mock(RpcResponseCallback.class);
        ByteBuffer unserializableMsg = ByteBuffer.wrap(new byte[]{18, 52, 86});
        try {
            this.handler.receive(this.client, unserializableMsg, callback);
            Assert.fail((String)"Should have thrown");
        }
        catch (Exception exception) {
            // empty catch block
        }
        ByteBuffer unexpectedMsg = new UploadBlock("a", "e", "b", new byte[1], new byte[2]).toByteBuffer();
        try {
            this.handler.receive(this.client, unexpectedMsg, callback);
            Assert.fail((String)"Should have thrown");
        }
        catch (UnsupportedOperationException unsupportedOperationException) {
            // empty catch block
        }
        ((RpcResponseCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.never())).onSuccess((ByteBuffer)Matchers.any(ByteBuffer.class));
        ((RpcResponseCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.never())).onFailure((Throwable)Matchers.any(Throwable.class));
    }
}

