/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.protobuf.ByteString;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.security.MessageDigest;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.InvalidMagicNumberException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
import org.apache.hadoop.hdfs.server.datanode.BlockSender;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.DataXceiverServer;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.sparkproject.guava.base.Preconditions;
import org.sparkproject.guava.net.InetAddresses;

class DataXceiver
extends Receiver
implements Runnable {
    public static final Log LOG = DataNode.LOG;
    static final Log ClientTraceLog = DataNode.ClientTraceLog;
    private Peer peer;
    private final String remoteAddress;
    private final String remoteAddressWithoutPort;
    private final String localAddress;
    private final DataNode datanode;
    private final DNConf dnConf;
    private final DataXceiverServer dataXceiverServer;
    private final boolean connectToDnViaHostname;
    private long opStartTime;
    private final InputStream socketIn;
    private OutputStream socketOut;
    private BlockReceiver blockReceiver = null;
    private String previousOpClientName;

    public static DataXceiver create(Peer peer, DataNode dn, DataXceiverServer dataXceiverServer) throws IOException {
        return new DataXceiver(peer, dn, dataXceiverServer);
    }

    private DataXceiver(Peer peer, DataNode datanode, DataXceiverServer dataXceiverServer) throws IOException {
        super(datanode.tracer);
        this.peer = peer;
        this.dnConf = datanode.getDnConf();
        this.socketIn = peer.getInputStream();
        this.socketOut = peer.getOutputStream();
        this.datanode = datanode;
        this.dataXceiverServer = dataXceiverServer;
        this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
        this.remoteAddress = peer.getRemoteAddressString();
        int colonIdx = this.remoteAddress.indexOf(58);
        this.remoteAddressWithoutPort = colonIdx < 0 ? this.remoteAddress : this.remoteAddress.substring(0, colonIdx);
        this.localAddress = peer.getLocalAddressString();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Number of active connections is: " + datanode.getXceiverCount()));
        }
    }

    private void updateCurrentThreadName(String status) {
        StringBuilder sb = new StringBuilder();
        sb.append("DataXceiver for client ");
        if (this.previousOpClientName != null) {
            sb.append(this.previousOpClientName).append(" at ");
        }
        sb.append(this.remoteAddress);
        if (status != null) {
            sb.append(" [").append(status).append("]");
        }
        Thread.currentThread().setName(sb.toString());
    }

    DataNode getDataNode() {
        return this.datanode;
    }

    private OutputStream getOutputStream() {
        return this.socketOut;
    }

    public void sendOOB() throws IOException, InterruptedException {
        LOG.info((Object)("Sending OOB to peer: " + this.peer));
        if (this.blockReceiver != null) {
            this.blockReceiver.sendOOB();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        int opsProcessed = 0;
        Enum op = null;
        try {
            this.dataXceiverServer.addPeer(this.peer, Thread.currentThread(), this);
            this.peer.setWriteTimeout(this.datanode.getDnConf().socketWriteTimeout);
            InputStream input = this.socketIn;
            try {
                IOStreamPair saslStreams = this.datanode.saslServer.receive(this.peer, this.socketOut, this.socketIn, this.datanode.getXferAddress().getPort(), this.datanode.getDatanodeId());
                input = new BufferedInputStream(saslStreams.in, HdfsConstants.SMALL_BUFFER_SIZE);
                this.socketOut = saslStreams.out;
            }
            catch (InvalidMagicNumberException imne) {
                if (imne.isHandshake4Encryption()) {
                    LOG.info((Object)("Failed to read expected encryption handshake from client at " + this.peer.getRemoteAddressString() + ". Perhaps the client is running an older version of Hadoop which does not support encryption"));
                } else {
                    LOG.info((Object)("Failed to read expected SASL data transfer protection handshake from client at " + this.peer.getRemoteAddressString() + ". Perhaps the client is running an older version of Hadoop which does not support SASL data transfer protection"));
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)(this.datanode.getDisplayName() + ":Number of active connections is: " + this.datanode.getXceiverCount()));
                }
                this.updateCurrentThreadName("Cleaning up");
                if (this.peer != null) {
                    this.dataXceiverServer.closePeer(this.peer);
                    IOUtils.closeStream((Closeable)this.in);
                }
                return;
            }
            super.initialize(new DataInputStream(input));
            do {
                this.updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
                try {
                    if (opsProcessed != 0) {
                        assert (this.dnConf.socketKeepaliveTimeout > 0);
                        this.peer.setReadTimeout(this.dnConf.socketKeepaliveTimeout);
                    } else {
                        this.peer.setReadTimeout(this.dnConf.socketTimeout);
                    }
                    op = this.readOp();
                }
                catch (InterruptedIOException ignored) {
                    break;
                }
                catch (IOException err) {
                    if (opsProcessed > 0 && (err instanceof EOFException || err instanceof ClosedChannelException)) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Cached " + this.peer + " closing after " + opsProcessed + " ops"));
                        }
                        break;
                    }
                    this.incrDatanodeNetworkErrors();
                    throw err;
                }
                if (opsProcessed != 0) {
                    this.peer.setReadTimeout(this.dnConf.socketTimeout);
                }
                this.opStartTime = Time.monotonicNow();
                this.processOp((Op)op);
                ++opsProcessed;
            } while (this.peer != null && !this.peer.isClosed() && this.dnConf.socketKeepaliveTimeout > 0);
        }
        catch (Throwable t) {
            String s = this.datanode.getDisplayName() + ":DataXceiver error processing " + (op == null ? "unknown" : op.name()) + " operation  src: " + this.remoteAddress + " dst: " + this.localAddress;
            if (op == Op.WRITE_BLOCK && t instanceof ReplicaAlreadyExistsException) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)s, t);
                } else {
                    LOG.info((Object)(s + "; " + t));
                }
            } else if (op == Op.READ_BLOCK && t instanceof SocketTimeoutException) {
                String s1 = "Likely the client has stopped reading, disconnecting it";
                s1 = s1 + " (" + s + ")";
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)s1, t);
                } else {
                    LOG.info((Object)(s1 + "; " + t));
                }
            } else {
                LOG.error((Object)s, t);
            }
        }
        finally {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)(this.datanode.getDisplayName() + ":Number of active connections is: " + this.datanode.getXceiverCount()));
            }
            this.updateCurrentThreadName("Cleaning up");
            if (this.peer != null) {
                this.dataXceiverServer.closePeer(this.peer);
                IOUtils.closeStream((Closeable)this.in);
            }
        }
    }

    private static InetAddress getClientAddress(Peer peer) {
        return InetAddresses.forString((String)peer.getRemoteAddressString().split(":")[0].substring(1));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestShortCircuitFds(ExtendedBlock blk, Token<BlockTokenIdentifier> token, ShortCircuitShm.SlotId slotId, int maxVersion, boolean supportsReceiptVerification) throws IOException {
        boolean success;
        Closeable[] fis;
        block19: {
            this.updateCurrentThreadName("Passing file descriptors for block " + blk);
            DataTransferProtos.BlockOpResponseProto.Builder bld = DataTransferProtos.BlockOpResponseProto.newBuilder();
            fis = null;
            ShortCircuitShm.SlotId registeredSlotId = null;
            success = false;
            try {
                try {
                    if (this.peer.getDomainSocket() == null) {
                        throw new IOException("You cannot pass file descriptors over anything but a UNIX domain socket.");
                    }
                    if (slotId != null) {
                        boolean isCached = this.datanode.data.isCached(blk.getBlockPoolId(), blk.getBlockId());
                        this.datanode.shortCircuitRegistry.registerSlot(ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
                        registeredSlotId = slotId;
                    }
                    Preconditions.checkState(((fis = this.datanode.requestShortCircuitFdsForRead(blk, token, maxVersion)) != null ? 1 : 0) != 0);
                    bld.setStatus(DataTransferProtos.Status.SUCCESS);
                    bld.setShortCircuitAccessVersion(1);
                }
                catch (DataNode.ShortCircuitFdsVersionException e) {
                    bld.setStatus(DataTransferProtos.Status.ERROR_UNSUPPORTED);
                    bld.setShortCircuitAccessVersion(1);
                    bld.setMessage(e.getMessage());
                }
                catch (DataNode.ShortCircuitFdsUnsupportedException e) {
                    bld.setStatus(DataTransferProtos.Status.ERROR_UNSUPPORTED);
                    bld.setMessage(e.getMessage());
                }
                catch (SecretManager.InvalidToken e) {
                    bld.setStatus(DataTransferProtos.Status.ERROR_ACCESS_TOKEN);
                    bld.setMessage(e.getMessage());
                }
                catch (IOException e) {
                    bld.setStatus(DataTransferProtos.Status.ERROR);
                    bld.setMessage(e.getMessage());
                }
                bld.build().writeDelimitedTo(this.socketOut);
                if (fis != null) {
                    FileDescriptor[] fds = new FileDescriptor[fis.length];
                    for (int i = 0; i < fds.length; ++i) {
                        fds[i] = ((FileInputStream)fis[i]).getFD();
                    }
                    byte[] buf = new byte[]{supportsReceiptVerification ? (byte)DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION.getNumber() : (byte)DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION.getNumber()};
                    DomainSocket sock = this.peer.getDomainSocket();
                    sock.sendFileDescriptors(fds, buf, 0, buf.length);
                    if (supportsReceiptVerification) {
                        LOG.trace((Object)("Reading receipt verification byte for " + slotId));
                        int val = sock.getInputStream().read();
                        if (val < 0) {
                            throw new EOFException();
                        }
                    } else {
                        LOG.trace((Object)("Receipt verification is not enabled on the DataNode.  Not verifying " + slotId));
                    }
                    success = true;
                }
                if (success || registeredSlotId == null) break block19;
            }
            catch (Throwable throwable) {
                if (!success && registeredSlotId != null) {
                    LOG.info((Object)("Unregistering " + registeredSlotId + " because the requestShortCircuitFdsForRead operation failed."));
                    this.datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId);
                }
                if (ClientTraceLog.isInfoEnabled()) {
                    DatanodeRegistration dnR = this.datanode.getDNRegistrationForBP(blk.getBlockPoolId());
                    BlockSender.ClientTraceLog.info((Object)String.format("src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS, blockid: %s, srvID: %s, success: %b", blk.getBlockId(), dnR.getDatanodeUuid(), success));
                }
                if (fis != null) {
                    IOUtils.cleanup((Log)LOG, fis);
                }
                throw throwable;
            }
            LOG.info((Object)("Unregistering " + registeredSlotId + " because the requestShortCircuitFdsForRead operation failed."));
            this.datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId);
        }
        if (ClientTraceLog.isInfoEnabled()) {
            DatanodeRegistration dnR = this.datanode.getDNRegistrationForBP(blk.getBlockPoolId());
            BlockSender.ClientTraceLog.info((Object)String.format("src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS, blockid: %s, srvID: %s, success: %b", blk.getBlockId(), dnR.getDatanodeUuid(), success));
        }
        if (fis != null) {
            IOUtils.cleanup((Log)LOG, (Closeable[])fis);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void releaseShortCircuitFds(ShortCircuitShm.SlotId slotId) throws IOException {
        boolean success = false;
        try {
            DataTransferProtos.Status status;
            String error;
            try {
                this.datanode.shortCircuitRegistry.unregisterSlot(slotId);
                error = null;
                status = DataTransferProtos.Status.SUCCESS;
            }
            catch (UnsupportedOperationException e) {
                error = "unsupported operation";
                status = DataTransferProtos.Status.ERROR_UNSUPPORTED;
            }
            catch (Throwable e) {
                error = e.getMessage();
                status = DataTransferProtos.Status.ERROR_INVALID;
            }
            DataTransferProtos.ReleaseShortCircuitAccessResponseProto.Builder bld = DataTransferProtos.ReleaseShortCircuitAccessResponseProto.newBuilder();
            bld.setStatus(status);
            if (error != null) {
                bld.setError(error);
            }
            bld.build().writeDelimitedTo(this.socketOut);
            success = true;
        }
        catch (Throwable throwable) {
            if (ClientTraceLog.isInfoEnabled()) {
                BlockSender.ClientTraceLog.info((Object)String.format("src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS, shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b", slotId.getShmId().getHi(), slotId.getShmId().getLo(), slotId.getSlotIdx(), this.datanode.getDatanodeUuid(), success));
            }
            throw throwable;
        }
        if (ClientTraceLog.isInfoEnabled()) {
            BlockSender.ClientTraceLog.info((Object)String.format("src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS, shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b", slotId.getShmId().getHi(), slotId.getShmId().getLo(), slotId.getSlotIdx(), this.datanode.getDatanodeUuid(), success));
        }
    }

    private void sendShmErrorResponse(DataTransferProtos.Status status, String error) throws IOException {
        DataTransferProtos.ShortCircuitShmResponseProto.newBuilder().setStatus(status).setError(error).build().writeDelimitedTo(this.socketOut);
    }

    private void sendShmSuccessResponse(DomainSocket sock, ShortCircuitRegistry.NewShmInfo shmInfo) throws IOException {
        DataTransferProtos.ShortCircuitShmResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setId(PBHelper.convert(shmInfo.shmId)).build().writeDelimitedTo(this.socketOut);
        byte[] buf = new byte[]{0};
        FileDescriptor[] shmFdArray = new FileDescriptor[]{shmInfo.stream.getFD()};
        sock.sendFileDescriptors(shmFdArray, buf, 0, buf.length);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void requestShortCircuitShm(String clientName) throws IOException {
        shmInfo = null;
        success = false;
        sock = this.peer.getDomainSocket();
        if (sock == null) {
            this.sendShmErrorResponse(DataTransferProtos.Status.ERROR_INVALID, "Bad request from " + this.peer + ": must request a shared memory segment over a UNIX domain socket.");
        }
        ** GOTO lbl28
        {
            catch (Throwable var6_7) {
                if (DataXceiver.ClientTraceLog.isInfoEnabled()) {
                    if (success) {
                        BlockSender.ClientTraceLog.info((Object)String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", new Object[]{clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(), this.datanode.getDatanodeUuid()}));
                    } else {
                        BlockSender.ClientTraceLog.info((Object)String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", new Object[]{clientName, this.datanode.getDatanodeUuid()}));
                    }
                }
                if (!success && this.peer == null) {
                    IOUtils.cleanup(null, (Closeable[])new Closeable[]{sock});
                }
                IOUtils.cleanup(null, (Closeable[])new Closeable[]{shmInfo});
                throw var6_7;
            }
            if (DataXceiver.ClientTraceLog.isInfoEnabled()) {
                if (success) {
                    BlockSender.ClientTraceLog.info((Object)String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", new Object[]{clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(), this.datanode.getDatanodeUuid()}));
                } else {
                    BlockSender.ClientTraceLog.info((Object)String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", new Object[]{clientName, this.datanode.getDatanodeUuid()}));
                }
            }
            if (!success && this.peer == null) {
                IOUtils.cleanup(null, (Closeable[])new Closeable[]{sock});
            }
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{shmInfo});
            return;
lbl28:
            // 1 sources

            try {
                shmInfo = this.datanode.shortCircuitRegistry.createNewMemorySegment(clientName, sock);
                this.releaseSocket();
            }
            catch (UnsupportedOperationException e) {
                this.sendShmErrorResponse(DataTransferProtos.Status.ERROR_UNSUPPORTED, "This datanode has not been configured to support short-circuit shared memory segments.");
                if (DataXceiver.ClientTraceLog.isInfoEnabled()) {
                    if (success) {
                        BlockSender.ClientTraceLog.info((Object)String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", new Object[]{clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(), this.datanode.getDatanodeUuid()}));
                    } else {
                        BlockSender.ClientTraceLog.info((Object)String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", new Object[]{clientName, this.datanode.getDatanodeUuid()}));
                    }
                }
                if (!success && this.peer == null) {
                    IOUtils.cleanup(null, (Closeable[])new Closeable[]{sock});
                }
                IOUtils.cleanup(null, (Closeable[])new Closeable[]{shmInfo});
                return;
            }
            catch (IOException e) {}
            {
                this.sendShmErrorResponse(DataTransferProtos.Status.ERROR, "Failed to create shared file descriptor: " + e.getMessage());
            }
            if (DataXceiver.ClientTraceLog.isInfoEnabled()) {
                if (success) {
                    BlockSender.ClientTraceLog.info((Object)String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", new Object[]{clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(), this.datanode.getDatanodeUuid()}));
                } else {
                    BlockSender.ClientTraceLog.info((Object)String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", new Object[]{clientName, this.datanode.getDatanodeUuid()}));
                }
            }
            if (!success && this.peer == null) {
                IOUtils.cleanup(null, (Closeable[])new Closeable[]{sock});
            }
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{shmInfo});
            return;
        }
        {
            this.sendShmSuccessResponse(sock, shmInfo);
            success = true;
        }
        if (DataXceiver.ClientTraceLog.isInfoEnabled()) {
            if (success) {
                BlockSender.ClientTraceLog.info((Object)String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", new Object[]{clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(), this.datanode.getDatanodeUuid()}));
            } else {
                BlockSender.ClientTraceLog.info((Object)String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", new Object[]{clientName, this.datanode.getDatanodeUuid()}));
            }
        }
        if (!success && this.peer == null) {
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{sock});
        }
        IOUtils.cleanup(null, (Closeable[])new Closeable[]{shmInfo});
    }

    void releaseSocket() {
        this.dataXceiverServer.releasePeer(this.peer);
        this.peer = null;
    }

    /*
     * Loose catch block
     */
    @Override
    public void readBlock(ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, String clientName, long blockOffset, long length, boolean sendChecksum, CachingStrategy cachingStrategy) throws IOException {
        this.previousOpClientName = clientName;
        long read = 0L;
        OutputStream baseStream = this.getOutputStream();
        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
        this.checkAccess(out, true, block, blockToken, Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
        BlockSender blockSender = null;
        DatanodeRegistration dnR = this.datanode.getDNRegistrationForBP(block.getBlockPoolId());
        String clientTraceFmt = clientName.length() > 0 && ClientTraceLog.isInfoEnabled() ? String.format("src: %s, dest: %s, bytes: %s, op: %s, cliID: %s, offset: %s, srvID: %s, blockid: %s, duration: %s", this.localAddress, this.remoteAddress, "%d", "HDFS_READ", clientName, "%d", dnR.getDatanodeUuid(), block, "%d") : dnR + " Served block " + block + " to " + this.remoteAddress;
        this.updateCurrentThreadName("Sending block " + block);
        try {
            try {
                blockSender = new BlockSender(block, blockOffset, length, true, false, sendChecksum, this.datanode, clientTraceFmt, cachingStrategy);
            }
            catch (IOException e) {
                String msg = "opReadBlock " + block + " received exception " + e;
                LOG.info((Object)msg);
                this.sendResponse(DataTransferProtos.Status.ERROR, msg);
                throw e;
            }
            this.writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(this.getOutputStream()));
            long beginRead = Time.monotonicNow();
            read = blockSender.sendBlock(out, baseStream, null);
            long duration = Time.monotonicNow() - beginRead;
            if (blockSender.didSendEntireByteRange()) {
                try {
                    DataTransferProtos.ClientReadStatusProto stat = DataTransferProtos.ClientReadStatusProto.parseFrom(PBHelper.vintPrefixed(this.in));
                    if (!stat.hasStatus()) {
                        LOG.warn((Object)("Client " + this.peer.getRemoteAddressString() + " did not send a valid status code after reading. Will close connection."));
                        IOUtils.closeStream((Closeable)out);
                    }
                }
                catch (IOException ioe) {
                    LOG.debug((Object)"Error reading client status response. Will close connection.", (Throwable)ioe);
                    IOUtils.closeStream((Closeable)out);
                    this.incrDatanodeNetworkErrors();
                }
            } else {
                IOUtils.closeStream((Closeable)out);
            }
            this.datanode.metrics.incrBytesRead((int)read);
            this.datanode.metrics.incrBlocksRead();
            this.datanode.metrics.incrTotalReadTime(duration);
            IOUtils.closeStream((Closeable)blockSender);
        }
        catch (SocketException ignored) {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)(dnR + ":Ignoring exception while serving " + block + " to " + this.remoteAddress), (Throwable)ignored);
            }
            this.datanode.metrics.incrBlocksRead();
            IOUtils.closeStream((Closeable)out);
        }
        catch (IOException ioe) {
            if (!(ioe instanceof SocketTimeoutException)) {
                LOG.warn((Object)(dnR + ":Got exception while serving " + block + " to " + this.remoteAddress), (Throwable)ioe);
                this.incrDatanodeNetworkErrors();
            }
            throw ioe;
            {
                catch (Throwable throwable) {
                    throw throwable;
                }
            }
        }
        finally {
            IOUtils.closeStream(blockSender);
        }
        this.datanode.metrics.addReadBlockOp(this.elapsed());
        this.datanode.metrics.incrReadsFromClient(this.peer.isLocal(), read);
    }

    @Override
    public void writeBlock(ExtendedBlock block, StorageType storageType, Token<BlockTokenIdentifier> blockToken, String clientname, DatanodeInfo[] targets, StorageType[] targetStorageTypes, DatanodeInfo srcDataNode, BlockConstructionStage stage, int pipelineSize, long minBytesRcvd, long maxBytesRcvd, long latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, boolean pinning, boolean[] targetPinnings) throws IOException {
        this.previousOpClientName = clientname;
        this.updateCurrentThreadName("Receiving block " + block);
        boolean isDatanode = clientname.length() == 0;
        boolean isClient = !isDatanode;
        boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED;
        long size = 0L;
        if (isTransfer && targets.length > 0) {
            throw new IOException((Object)((Object)stage) + " does not support multiple targets " + Arrays.asList(targets));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("opWriteBlock: stage=" + (Object)((Object)stage) + ", clientname=" + clientname + "\n  block  =" + block + ", newGs=" + latestGenerationStamp + ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]\n  targets=" + Arrays.asList(targets) + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode + ", pinning=" + pinning));
            LOG.debug((Object)("isDatanode=" + isDatanode + ", isClient=" + isClient + ", isTransfer=" + isTransfer));
            LOG.debug((Object)("writeBlock receive buf size " + this.peer.getReceiveBufferSize() + " tcp no delay " + this.peer.getTcpNoDelay()));
        }
        ExtendedBlock originalBlock = new ExtendedBlock(block);
        if (block.getNumBytes() == 0L) {
            block.setNumBytes(this.dataXceiverServer.estimateBlockSize);
        }
        LOG.info((Object)("Receiving " + block + " src: " + this.remoteAddress + " dest: " + this.localAddress));
        DataOutputStream replyOut = new DataOutputStream(new BufferedOutputStream(this.getOutputStream(), HdfsConstants.SMALL_BUFFER_SIZE));
        this.checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
        DataOutputStream mirrorOut = null;
        DataInputStream mirrorIn = null;
        Socket mirrorSock = null;
        String mirrorNode = null;
        String firstBadLink = "";
        DataTransferProtos.Status mirrorInStatus = DataTransferProtos.Status.SUCCESS;
        try {
            String storageUuid;
            if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                this.blockReceiver = new BlockReceiver(block, storageType, this.in, this.peer.getRemoteAddressString(), this.peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, this.datanode, requestedChecksum, cachingStrategy, pinning);
                storageUuid = this.blockReceiver.getStorageUuid();
            } else {
                storageUuid = this.datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
            }
            if (targets.length > 0) {
                InetSocketAddress mirrorTarget = null;
                mirrorNode = targets[0].getXferAddr(this.connectToDnViaHostname);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Connecting to datanode " + mirrorNode));
                }
                mirrorTarget = NetUtils.createSocketAddr((String)mirrorNode);
                mirrorSock = this.datanode.newSocket();
                try {
                    int timeoutValue = this.dnConf.socketTimeout + HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length;
                    int writeTimeout = this.dnConf.socketWriteTimeout + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length;
                    NetUtils.connect((Socket)mirrorSock, (SocketAddress)mirrorTarget, (int)timeoutValue);
                    mirrorSock.setSoTimeout(timeoutValue);
                    mirrorSock.setSendBufferSize(131072);
                    OutputStream unbufMirrorOut = NetUtils.getOutputStream((Socket)mirrorSock, (long)writeTimeout);
                    Object unbufMirrorIn = NetUtils.getInputStream((Socket)mirrorSock);
                    DataEncryptionKeyFactory keyFactory = this.datanode.getDataEncryptionKeyFactoryForBlock(block);
                    IOStreamPair saslStreams = this.datanode.saslClient.socketSend(mirrorSock, unbufMirrorOut, (InputStream)unbufMirrorIn, keyFactory, blockToken, targets[0]);
                    unbufMirrorOut = saslStreams.out;
                    unbufMirrorIn = saslStreams.in;
                    mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, HdfsConstants.SMALL_BUFFER_SIZE));
                    mirrorIn = new DataInputStream((InputStream)unbufMirrorIn);
                    if (targetPinnings != null && targetPinnings.length > 0) {
                        new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, targetPinnings[0], targetPinnings);
                    } else {
                        new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, false, targetPinnings);
                    }
                    mirrorOut.flush();
                    DataNodeFaultInjector.get().writeBlockAfterFlush();
                    if (isClient) {
                        DataTransferProtos.BlockOpResponseProto connectAck = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
                        mirrorInStatus = connectAck.getStatus();
                        firstBadLink = connectAck.getFirstBadLink();
                        if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtos.Status.SUCCESS) {
                            LOG.info((Object)("Datanode " + targets.length + " got response for connect ack  from downstream datanode with firstbadlink as " + firstBadLink));
                        }
                    }
                }
                catch (IOException e) {
                    if (isClient) {
                        DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.ERROR).setFirstBadLink(targets[0].getXferAddr()).build().writeDelimitedTo(replyOut);
                        replyOut.flush();
                    }
                    IOUtils.closeStream(mirrorOut);
                    mirrorOut = null;
                    IOUtils.closeStream(mirrorIn);
                    mirrorIn = null;
                    IOUtils.closeSocket((Socket)mirrorSock);
                    mirrorSock = null;
                    if (isClient) {
                        LOG.error((Object)(this.datanode + ":Exception transfering block " + block + " to mirror " + mirrorNode + ": " + e));
                        throw e;
                    }
                    LOG.info((Object)(this.datanode + ":Exception transfering " + block + " to mirror " + mirrorNode + "- continuing without the mirror"), (Throwable)e);
                    this.incrDatanodeNetworkErrors();
                }
            }
            if (isClient && !isTransfer) {
                if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtos.Status.SUCCESS) {
                    LOG.info((Object)("Datanode " + targets.length + " forwarding connect ack to upstream firstbadlink is " + firstBadLink));
                }
                DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(mirrorInStatus).setFirstBadLink(firstBadLink).build().writeDelimitedTo(replyOut);
                replyOut.flush();
            }
            if (this.blockReceiver != null) {
                String mirrorAddr = mirrorSock == null ? null : mirrorNode;
                this.blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, null, targets, false);
                if (isTransfer) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace((Object)"TRANSFER: send close-ack");
                    }
                    DataXceiver.writeResponse(DataTransferProtos.Status.SUCCESS, null, replyOut);
                }
            }
            if (isClient && stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                block.setGenerationStamp(latestGenerationStamp);
                block.setNumBytes(minBytesRcvd);
            }
            if (isDatanode || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                this.datanode.closeBlock(block, "", storageUuid);
                LOG.info((Object)("Received " + block + " src: " + this.remoteAddress + " dest: " + this.localAddress + " of size " + block.getNumBytes()));
            }
            if (isClient) {
                size = block.getNumBytes();
            }
        }
        catch (IOException ioe) {
            try {
                LOG.info((Object)("opWriteBlock " + block + " received exception " + ioe));
                this.incrDatanodeNetworkErrors();
                throw ioe;
            }
            catch (Throwable throwable) {
                IOUtils.closeStream(mirrorOut);
                IOUtils.closeStream(mirrorIn);
                IOUtils.closeStream((Closeable)replyOut);
                IOUtils.closeSocket(mirrorSock);
                IOUtils.closeStream((Closeable)this.blockReceiver);
                this.blockReceiver = null;
                throw throwable;
            }
        }
        IOUtils.closeStream(mirrorOut);
        IOUtils.closeStream(mirrorIn);
        IOUtils.closeStream((Closeable)replyOut);
        IOUtils.closeSocket((Socket)mirrorSock);
        IOUtils.closeStream((Closeable)this.blockReceiver);
        this.blockReceiver = null;
        this.datanode.metrics.addWriteBlockOp(this.elapsed());
        this.datanode.metrics.incrWritesFromClient(this.peer.isLocal(), size);
    }

    @Override
    public void transferBlock(ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken, String clientName, DatanodeInfo[] targets, StorageType[] targetStorageTypes) throws IOException {
        this.checkAccess(this.socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
        this.previousOpClientName = clientName;
        this.updateCurrentThreadName((Object)((Object)Op.TRANSFER_BLOCK) + " " + blk);
        DataOutputStream out = new DataOutputStream(this.getOutputStream());
        try {
            this.datanode.transferReplicaForPipelineRecovery(blk, targets, targetStorageTypes, clientName);
            DataXceiver.writeResponse(DataTransferProtos.Status.SUCCESS, null, out);
        }
        catch (IOException ioe) {
            LOG.info((Object)("transferBlock " + blk + " received exception " + ioe));
            this.incrDatanodeNetworkErrors();
            throw ioe;
        }
        finally {
            IOUtils.closeStream((Closeable)out);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private MD5Hash calcPartialBlockChecksum(ExtendedBlock block, long requestLength, DataChecksum checksum, DataInputStream checksumIn) throws IOException {
        int bytesPerCRC = checksum.getBytesPerChecksum();
        int csize = checksum.getChecksumSize();
        byte[] buffer = new byte[4096];
        MessageDigest digester = MD5Hash.getDigester();
        int toDigest = 0;
        for (long remaining = requestLength / (long)bytesPerCRC * (long)csize; remaining > 0L && (toDigest = checksumIn.read(buffer, 0, (int)Math.min(remaining, (long)buffer.length))) >= 0; remaining -= (long)toDigest) {
            digester.update(buffer, 0, toDigest);
        }
        int partialLength = (int)(requestLength % (long)bytesPerCRC);
        if (partialLength > 0) {
            byte[] buf = new byte[partialLength];
            InputStream blockIn = this.datanode.data.getBlockInputStream(block, requestLength - (long)partialLength);
            try {
                IOUtils.readFully((InputStream)blockIn, (byte[])buf, (int)0, (int)partialLength);
            }
            finally {
                IOUtils.closeStream((Closeable)blockIn);
            }
            checksum.update(buf, 0, partialLength);
            byte[] partialCrc = new byte[csize];
            checksum.writeValue(partialCrc, 0, true);
            digester.update(partialCrc);
        }
        return new MD5Hash(digester.digest());
    }

    @Override
    public void blockChecksum(ExtendedBlock block, Token<BlockTokenIdentifier> blockToken) throws IOException {
        DataOutputStream out = new DataOutputStream(this.getOutputStream());
        this.checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
        long requestLength = block.getNumBytes();
        Preconditions.checkArgument((requestLength >= 0L ? 1 : 0) != 0);
        long visibleLength = this.datanode.data.getReplicaVisibleLength(block);
        boolean partialBlk = requestLength < visibleLength;
        this.updateCurrentThreadName("Reading metadata for block " + block);
        LengthInputStream metadataIn = this.datanode.data.getMetaDataInputStream(block);
        DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
        this.updateCurrentThreadName("Getting checksum for block " + block);
        try {
            MD5Hash md5;
            BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
            DataChecksum checksum = header.getChecksum();
            int csize = checksum.getChecksumSize();
            int bytesPerCRC = checksum.getBytesPerChecksum();
            long crcPerBlock = csize <= 0 ? 0L : (metadataIn.getLength() - (long)BlockMetadataHeader.getHeaderSize()) / (long)csize;
            MD5Hash mD5Hash = md5 = partialBlk && crcPerBlock > 0L ? this.calcPartialBlockChecksum(block, requestLength, checksum, checksumIn) : MD5Hash.digest((InputStream)checksumIn);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("block=" + block + ", bytesPerCRC=" + bytesPerCRC + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5));
            }
            DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setChecksumResponse(DataTransferProtos.OpBlockChecksumResponseProto.newBuilder().setBytesPerCrc(bytesPerCRC).setCrcPerBlock(crcPerBlock).setMd5(ByteString.copyFrom((byte[])md5.getDigest())).setCrcType(PBHelper.convert(checksum.getChecksumType()))).build().writeDelimitedTo(out);
            out.flush();
        }
        catch (IOException ioe) {
            LOG.info((Object)("blockChecksum " + block + " received exception " + ioe));
            this.incrDatanodeNetworkErrors();
            throw ioe;
        }
        finally {
            IOUtils.closeStream((Closeable)out);
            IOUtils.closeStream((Closeable)checksumIn);
            IOUtils.closeStream((Closeable)metadataIn);
        }
        this.datanode.metrics.addBlockChecksumOp(this.elapsed());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void copyBlock(ExtendedBlock block, Token<BlockTokenIdentifier> blockToken) throws IOException {
        DataOutputStream reply;
        BlockSender blockSender;
        block14: {
            String msg;
            this.updateCurrentThreadName("Copying block " + block);
            if (this.datanode.isBlockTokenEnabled) {
                try {
                    this.datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, BlockTokenIdentifier.AccessMode.COPY);
                }
                catch (SecretManager.InvalidToken e) {
                    LOG.warn((Object)("Invalid access token in request from " + this.remoteAddress + " for OP_COPY_BLOCK for block " + block + " : " + e.getLocalizedMessage()));
                    this.sendResponse(DataTransferProtos.Status.ERROR_ACCESS_TOKEN, "Invalid access token");
                    return;
                }
            }
            if (this.datanode.data.getPinning(block)) {
                msg = "Not able to copy block " + block.getBlockId() + " to " + this.peer.getRemoteAddressString() + " because it's pinned ";
                LOG.info((Object)msg);
                this.sendResponse(DataTransferProtos.Status.ERROR, msg);
            }
            if (!this.dataXceiverServer.balanceThrottler.acquire()) {
                msg = "Not able to copy block " + block.getBlockId() + " to " + this.peer.getRemoteAddressString() + " because threads quota is exceeded.";
                LOG.info((Object)msg);
                this.sendResponse(DataTransferProtos.Status.ERROR, msg);
                return;
            }
            blockSender = null;
            reply = null;
            boolean isOpSuccess = true;
            try {
                blockSender = new BlockSender(block, 0L, -1L, false, false, true, this.datanode, null, CachingStrategy.newDropBehind());
                OutputStream baseStream2332 = this.getOutputStream();
                reply = new DataOutputStream(new BufferedOutputStream(baseStream2332, HdfsConstants.SMALL_BUFFER_SIZE));
                this.writeSuccessWithChecksumInfo(blockSender, reply);
                long beginRead = Time.monotonicNow();
                long read = blockSender.sendBlock(reply, baseStream2332, this.dataXceiverServer.balanceThrottler);
                long duration = Time.monotonicNow() - beginRead;
                this.datanode.metrics.incrBytesRead((int)read);
                this.datanode.metrics.incrBlocksRead();
                this.datanode.metrics.incrTotalReadTime(duration);
                LOG.info((Object)("Copied " + block + " to " + this.peer.getRemoteAddressString()));
                this.dataXceiverServer.balanceThrottler.release();
                if (!isOpSuccess) break block14;
            }
            catch (IOException ioe) {
                try {
                    isOpSuccess = false;
                    LOG.info((Object)("opCopyBlock " + block + " received exception " + ioe));
                    this.incrDatanodeNetworkErrors();
                    throw ioe;
                }
                catch (Throwable throwable) {
                    this.dataXceiverServer.balanceThrottler.release();
                    if (isOpSuccess) {
                        try {
                            reply.writeChar(100);
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                    }
                    IOUtils.closeStream(reply);
                    IOUtils.closeStream(blockSender);
                    throw throwable;
                }
            }
            try {
                reply.writeChar(100);
            }
            catch (IOException baseStream2332) {
                // empty catch block
            }
        }
        IOUtils.closeStream((Closeable)reply);
        IOUtils.closeStream((Closeable)blockSender);
        this.datanode.metrics.addCopyBlockOp(this.elapsed());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void replaceBlock(ExtendedBlock block, StorageType storageType, Token<BlockTokenIdentifier> blockToken, String delHint, DatanodeInfo proxySource) throws IOException {
        DataOutputStream replyOut;
        DataInputStream proxyReply;
        BlockReceiver blockReceiver;
        String errMsg;
        DataTransferProtos.Status opStatus;
        DataOutputStream proxyOut;
        block22: {
            this.updateCurrentThreadName("Replacing block " + block + " from " + delHint);
            if (this.datanode.isBlockTokenEnabled) {
                try {
                    this.datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, BlockTokenIdentifier.AccessMode.REPLACE);
                }
                catch (SecretManager.InvalidToken e) {
                    LOG.warn((Object)("Invalid access token in request from " + this.remoteAddress + " for OP_REPLACE_BLOCK for block " + block + " : " + e.getLocalizedMessage()));
                    this.sendResponse(DataTransferProtos.Status.ERROR_ACCESS_TOKEN, "Invalid access token");
                    return;
                }
            }
            if (!this.dataXceiverServer.balanceThrottler.acquire()) {
                String msg = "Not able to receive block " + block.getBlockId() + " from " + this.peer.getRemoteAddressString() + " because threads quota is exceeded.";
                LOG.warn((Object)msg);
                this.sendResponse(DataTransferProtos.Status.ERROR, msg);
                return;
            }
            Socket proxySock = null;
            proxyOut = null;
            opStatus = DataTransferProtos.Status.SUCCESS;
            errMsg = null;
            blockReceiver = null;
            proxyReply = null;
            replyOut = new DataOutputStream(this.getOutputStream());
            boolean IoeDuringCopyBlockOperation = false;
            try {
                if (proxySource.equals(this.datanode.getDatanodeId())) {
                    ReplicaInfo oldReplica = this.datanode.data.moveBlockAcrossStorage(block, storageType);
                    if (oldReplica != null) {
                        LOG.info((Object)("Moved " + block + " from StorageType " + oldReplica.getVolume().getStorageType() + " to " + storageType));
                    }
                } else {
                    block.setNumBytes(this.dataXceiverServer.estimateBlockSize);
                    String dnAddr = proxySource.getXferAddr(this.connectToDnViaHostname);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Connecting to datanode " + dnAddr));
                    }
                    InetSocketAddress proxyAddr = NetUtils.createSocketAddr((String)dnAddr);
                    proxySock = this.datanode.newSocket();
                    NetUtils.connect((Socket)proxySock, (SocketAddress)proxyAddr, (int)this.dnConf.socketTimeout);
                    proxySock.setSoTimeout(this.dnConf.socketTimeout);
                    OutputStream unbufProxyOut = NetUtils.getOutputStream((Socket)proxySock, (long)this.dnConf.socketWriteTimeout);
                    Object unbufProxyIn = NetUtils.getInputStream((Socket)proxySock);
                    DataEncryptionKeyFactory keyFactory = this.datanode.getDataEncryptionKeyFactoryForBlock(block);
                    IOStreamPair saslStreams = this.datanode.saslClient.socketSend(proxySock, unbufProxyOut, (InputStream)unbufProxyIn, keyFactory, blockToken, proxySource);
                    unbufProxyOut = saslStreams.out;
                    unbufProxyIn = saslStreams.in;
                    proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, HdfsConstants.SMALL_BUFFER_SIZE));
                    proxyReply = new DataInputStream(new BufferedInputStream((InputStream)unbufProxyIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
                    IoeDuringCopyBlockOperation = true;
                    new Sender(proxyOut).copyBlock(block, blockToken);
                    IoeDuringCopyBlockOperation = false;
                    DataTransferProtos.BlockOpResponseProto copyResponse = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(proxyReply));
                    String logInfo = "copy block " + block + " from " + proxySock.getRemoteSocketAddress();
                    DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo);
                    DataTransferProtos.ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
                    DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(checksumInfo.getChecksum());
                    blockReceiver = new BlockReceiver(block, storageType, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0L, 0L, 0L, "", null, this.datanode, remoteChecksum, CachingStrategy.newDropBehind(), false);
                    blockReceiver.receiveBlock(null, null, replyOut, null, this.dataXceiverServer.balanceThrottler, null, true);
                    this.datanode.notifyNamenodeReceivedBlock(block, delHint, blockReceiver.getStorageUuid());
                    LOG.info((Object)("Moved " + block + " from " + this.peer.getRemoteAddressString() + ", delHint=" + delHint));
                }
                if (opStatus != DataTransferProtos.Status.SUCCESS || proxyReply == null) break block22;
            }
            catch (IOException ioe) {
                try {
                    opStatus = DataTransferProtos.Status.ERROR;
                    errMsg = "opReplaceBlock " + block + " received exception " + ioe;
                    LOG.info((Object)errMsg);
                    if (!IoeDuringCopyBlockOperation) {
                        this.incrDatanodeNetworkErrors();
                    }
                    throw ioe;
                }
                catch (Throwable throwable) {
                    if (opStatus == DataTransferProtos.Status.SUCCESS && proxyReply != null) {
                        try {
                            proxyReply.readChar();
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                    }
                    this.dataXceiverServer.balanceThrottler.release();
                    try {
                        this.sendResponse(opStatus, errMsg);
                    }
                    catch (IOException ioe2) {
                        LOG.warn((Object)("Error writing reply back to " + this.peer.getRemoteAddressString()));
                        this.incrDatanodeNetworkErrors();
                    }
                    IOUtils.closeStream(proxyOut);
                    IOUtils.closeStream(blockReceiver);
                    IOUtils.closeStream((Closeable)proxyReply);
                    IOUtils.closeStream((Closeable)replyOut);
                    throw throwable;
                }
            }
            try {
                proxyReply.readChar();
            }
            catch (IOException dnAddr) {
                // empty catch block
            }
        }
        this.dataXceiverServer.balanceThrottler.release();
        try {
            this.sendResponse(opStatus, errMsg);
        }
        catch (IOException ioe) {
            LOG.warn((Object)("Error writing reply back to " + this.peer.getRemoteAddressString()));
            this.incrDatanodeNetworkErrors();
        }
        IOUtils.closeStream(proxyOut);
        IOUtils.closeStream(blockReceiver);
        IOUtils.closeStream((Closeable)proxyReply);
        IOUtils.closeStream((Closeable)replyOut);
        this.datanode.metrics.addReplaceBlockOp(this.elapsed());
    }

    private long elapsed() {
        return Time.monotonicNow() - this.opStartTime;
    }

    private void sendResponse(DataTransferProtos.Status status, String message) throws IOException {
        DataXceiver.writeResponse(status, message, this.getOutputStream());
    }

    private static void writeResponse(DataTransferProtos.Status status, String message, OutputStream out) throws IOException {
        DataTransferProtos.BlockOpResponseProto.Builder response = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(status);
        if (message != null) {
            response.setMessage(message);
        }
        response.build().writeDelimitedTo(out);
        out.flush();
    }

    private void writeSuccessWithChecksumInfo(BlockSender blockSender, DataOutputStream out) throws IOException {
        DataTransferProtos.ReadOpChecksumInfoProto ckInfo = DataTransferProtos.ReadOpChecksumInfoProto.newBuilder().setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum())).setChunkOffset(blockSender.getOffset()).build();
        DataTransferProtos.BlockOpResponseProto response = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setReadOpChecksumInfo(ckInfo).build();
        response.writeDelimitedTo(out);
        out.flush();
    }

    private void incrDatanodeNetworkErrors() {
        this.datanode.incrDatanodeNetworkErrors(this.remoteAddressWithoutPort);
    }

    private void checkAccess(OutputStream out, boolean reply, ExtendedBlock blk, Token<BlockTokenIdentifier> t, Op op, BlockTokenIdentifier.AccessMode mode) throws IOException {
        if (this.datanode.isBlockTokenEnabled) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Checking block access token for block '" + blk.getBlockId() + "' with mode '" + (Object)((Object)mode) + "'"));
            }
            try {
                this.datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
            }
            catch (SecretManager.InvalidToken e) {
                try {
                    if (reply) {
                        DataTransferProtos.BlockOpResponseProto.Builder resp = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.ERROR_ACCESS_TOKEN);
                        if (mode == BlockTokenIdentifier.AccessMode.WRITE) {
                            DatanodeRegistration dnR = this.datanode.getDNRegistrationForBP(blk.getBlockPoolId());
                            resp.setFirstBadLink(dnR.getXferAddr());
                        }
                        resp.build().writeDelimitedTo(out);
                        out.flush();
                    }
                    LOG.warn((Object)("Block token verification failed: op=" + (Object)((Object)op) + ", remoteAddress=" + this.remoteAddress + ", message=" + e.getLocalizedMessage()));
                    throw e;
                }
                catch (Throwable throwable) {
                    IOUtils.closeStream((Closeable)out);
                    throw throwable;
                }
            }
        }
    }
}

