/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;

class BlockReceiver
implements Closeable {
    public static final Log LOG = DataNode.LOG;
    static final Log ClientTraceLog = DataNode.ClientTraceLog;
    @VisibleForTesting
    static long CACHE_DROP_LAG_BYTES = 0x800000L;
    private DataInputStream in = null;
    private DataChecksum clientChecksum;
    private DataChecksum diskChecksum;
    private boolean needsChecksumTranslation;
    private OutputStream out = null;
    private FileDescriptor outFd;
    private DataOutputStream checksumOut = null;
    private int bytesPerChecksum;
    private int checksumSize;
    private PacketReceiver packetReceiver = new PacketReceiver(false);
    protected final String inAddr;
    protected final String myAddr;
    private String mirrorAddr;
    private DataOutputStream mirrorOut;
    private Daemon responder = null;
    private DataTransferThrottler throttler;
    private ReplicaOutputStreams streams;
    private DatanodeInfo srcDataNode = null;
    private Checksum partialCrc = null;
    private final DataNode datanode;
    private volatile boolean mirrorError;
    private boolean dropCacheBehindWrites;
    private long lastCacheManagementOffset = 0L;
    private boolean syncBehindWrites;
    private final String clientname;
    private final boolean isClient;
    private final boolean isDatanode;
    private final ExtendedBlock block;
    private final ReplicaInPipelineInterface replicaInfo;
    private final BlockConstructionStage stage;
    private final boolean isTransfer;
    private boolean syncOnClose;
    private long restartBudget;
    private static DataTransferProtos.Status[] MIRROR_ERROR_STATUS = new DataTransferProtos.Status[]{DataTransferProtos.Status.SUCCESS, DataTransferProtos.Status.ERROR};

    BlockReceiver(ExtendedBlock block, StorageType storageType, DataInputStream in, String inAddr, String myAddr, BlockConstructionStage stage, long newGs, long minBytesRcvd, long maxBytesRcvd, String clientname, DatanodeInfo srcDataNode, DataNode datanode, DataChecksum requestedChecksum, CachingStrategy cachingStrategy) throws IOException {
        try {
            this.block = block;
            this.in = in;
            this.inAddr = inAddr;
            this.myAddr = myAddr;
            this.srcDataNode = srcDataNode;
            this.datanode = datanode;
            this.clientname = clientname;
            this.isDatanode = clientname.length() == 0;
            this.isClient = !this.isDatanode;
            this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
            this.stage = stage;
            boolean bl = this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)(this.getClass().getSimpleName() + ": " + block + "\n  isClient  =" + this.isClient + ", clientname=" + clientname + "\n  isDatanode=" + this.isDatanode + ", srcDataNode=" + srcDataNode + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr + "\n  cachingStrategy = " + cachingStrategy));
            }
            if (this.isDatanode) {
                this.replicaInfo = datanode.data.createTemporary(storageType, block);
            } else {
                switch (stage) {
                    case PIPELINE_SETUP_CREATE: {
                        this.replicaInfo = datanode.data.createRbw(storageType, block);
                        datanode.notifyNamenodeCreatingBlock(block, this.replicaInfo.getStorageUuid());
                        break;
                    }
                    case PIPELINE_SETUP_STREAMING_RECOVERY: {
                        this.replicaInfo = datanode.data.recoverRbw(block, newGs, minBytesRcvd, maxBytesRcvd);
                        block.setGenerationStamp(newGs);
                        break;
                    }
                    case PIPELINE_SETUP_APPEND: {
                        this.replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
                        if (datanode.blockScanner != null) {
                            datanode.blockScanner.deleteBlock(block.getBlockPoolId(), block.getLocalBlock());
                        }
                        block.setGenerationStamp(newGs);
                        datanode.notifyNamenodeAppendingBlock(block, this.replicaInfo.getStorageUuid());
                        break;
                    }
                    case PIPELINE_SETUP_APPEND_RECOVERY: {
                        this.replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
                        if (datanode.blockScanner != null) {
                            datanode.blockScanner.deleteBlock(block.getBlockPoolId(), block.getLocalBlock());
                        }
                        block.setGenerationStamp(newGs);
                        datanode.notifyNamenodeAppendingRecoveredAppend(block, this.replicaInfo.getStorageUuid());
                        break;
                    }
                    case TRANSFER_RBW: 
                    case TRANSFER_FINALIZED: {
                        this.replicaInfo = datanode.data.createTemporary(storageType, block);
                        break;
                    }
                    default: {
                        throw new IOException("Unsupported stage " + (Object)((Object)stage) + " while receiving block " + block + " from " + inAddr);
                    }
                }
            }
            this.dropCacheBehindWrites = cachingStrategy.getDropBehind() == null ? datanode.getDnConf().dropCacheBehindWrites : cachingStrategy.getDropBehind();
            this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
            boolean isCreate = this.isDatanode || this.isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
            this.streams = this.replicaInfo.createStreams(isCreate, requestedChecksum);
            assert (this.streams != null) : "null streams!";
            this.clientChecksum = requestedChecksum;
            this.diskChecksum = this.streams.getChecksum();
            this.needsChecksumTranslation = !this.clientChecksum.equals((Object)this.diskChecksum);
            this.bytesPerChecksum = this.diskChecksum.getBytesPerChecksum();
            this.checksumSize = this.diskChecksum.getChecksumSize();
            this.out = this.streams.getDataOut();
            if (this.out instanceof FileOutputStream) {
                this.outFd = ((FileOutputStream)this.out).getFD();
            } else {
                LOG.warn((Object)("Could not get file descriptor for outputstream of class " + this.out.getClass()));
            }
            this.checksumOut = new DataOutputStream(new BufferedOutputStream(this.streams.getChecksumOut(), HdfsConstants.SMALL_BUFFER_SIZE));
            if (isCreate) {
                BlockMetadataHeader.writeHeader(this.checksumOut, this.diskChecksum);
            }
        }
        catch (ReplicaAlreadyExistsException | ReplicaNotFoundException bae) {
            throw bae;
        }
        catch (IOException ioe) {
            IOUtils.closeStream((Closeable)this);
            this.cleanupBlock();
            IOException cause = DatanodeUtil.getCauseIfDiskError(ioe);
            DataNode.LOG.warn((Object)"IOException in BlockReceiver constructor. Cause is ", (Throwable)cause);
            if (cause != null) {
                ioe = cause;
                datanode.checkDiskError(ioe);
            }
            throw ioe;
        }
    }

    DataNode getDataNode() {
        return this.datanode;
    }

    String getStorageUuid() {
        return this.replicaInfo.getStorageUuid();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        long fsyncStartNanos;
        long flushEndNanos;
        long flushStartNanos;
        if (this.packetReceiver != null) {
            this.packetReceiver.close();
        }
        IOException ioe = null;
        if (this.syncOnClose && (this.out != null || this.checksumOut != null)) {
            this.datanode.metrics.incrFsyncCount();
        }
        long flushTotalNanos = 0L;
        boolean measuredFlushTime = false;
        try {
            if (this.checksumOut != null) {
                flushStartNanos = System.nanoTime();
                this.checksumOut.flush();
                flushEndNanos = System.nanoTime();
                if (this.syncOnClose) {
                    fsyncStartNanos = flushEndNanos;
                    this.streams.syncChecksumOut();
                    this.datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
                }
                flushTotalNanos += flushEndNanos - flushStartNanos;
                measuredFlushTime = true;
                this.checksumOut.close();
                this.checksumOut = null;
            }
        }
        catch (IOException e) {
            ioe = e;
        }
        finally {
            IOUtils.closeStream((Closeable)this.checksumOut);
        }
        try {
            if (this.out != null) {
                flushStartNanos = System.nanoTime();
                this.out.flush();
                flushEndNanos = System.nanoTime();
                if (this.syncOnClose) {
                    fsyncStartNanos = flushEndNanos;
                    this.streams.syncDataOut();
                    this.datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
                }
                flushTotalNanos += flushEndNanos - flushStartNanos;
                measuredFlushTime = true;
                this.out.close();
                this.out = null;
            }
        }
        catch (IOException e) {
            ioe = e;
        }
        finally {
            IOUtils.closeStream((Closeable)this.out);
        }
        if (measuredFlushTime) {
            this.datanode.metrics.addFlushNanos(flushTotalNanos);
        }
        if (ioe != null) {
            this.datanode.checkDiskError(ioe);
            throw ioe;
        }
    }

    void flushOrSync(boolean isSync) throws IOException {
        long fsyncStartNanos;
        long flushEndNanos;
        long flushStartNanos;
        long flushTotalNanos = 0L;
        if (this.checksumOut != null) {
            flushStartNanos = System.nanoTime();
            this.checksumOut.flush();
            flushEndNanos = System.nanoTime();
            if (isSync) {
                fsyncStartNanos = flushEndNanos;
                this.streams.syncChecksumOut();
                this.datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
            }
            flushTotalNanos += flushEndNanos - flushStartNanos;
        }
        if (this.out != null) {
            flushStartNanos = System.nanoTime();
            this.out.flush();
            flushEndNanos = System.nanoTime();
            if (isSync) {
                fsyncStartNanos = flushEndNanos;
                this.streams.syncDataOut();
                this.datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
            }
            flushTotalNanos += flushEndNanos - flushStartNanos;
        }
        if (this.checksumOut != null || this.out != null) {
            this.datanode.metrics.addFlushNanos(flushTotalNanos);
            if (isSync) {
                this.datanode.metrics.incrFsyncCount();
            }
        }
    }

    private void handleMirrorOutError(IOException ioe) throws IOException {
        String bpid = this.block.getBlockPoolId();
        LOG.info((Object)(this.datanode.getDNRegistrationForBP(bpid) + ":Exception writing " + this.block + " to mirror " + this.mirrorAddr), (Throwable)ioe);
        if (Thread.interrupted()) {
            throw ioe;
        }
        this.mirrorError = true;
    }

    private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) throws IOException {
        try {
            this.clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, this.clientname, 0L);
        }
        catch (ChecksumException ce) {
            LOG.warn((Object)("Checksum error in block " + this.block + " from " + this.inAddr), (Throwable)ce);
            if (this.srcDataNode != null && this.isDatanode) {
                try {
                    LOG.info((Object)("report corrupt " + this.block + " from datanode " + this.srcDataNode + " to namenode"));
                    this.datanode.reportRemoteBadBlock(this.srcDataNode, this.block);
                }
                catch (IOException e) {
                    LOG.warn((Object)("Failed to report bad " + this.block + " from datanode " + this.srcDataNode + " to namenode"));
                }
            }
            throw new IOException("Unexpected checksum mismatch while writing " + this.block + " from " + this.inAddr);
        }
    }

    private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) {
        this.diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
    }

    private boolean shouldVerifyChecksum() {
        return this.mirrorOut == null || this.isDatanode || this.needsChecksumTranslation;
    }

    private int receivePacket() throws IOException {
        this.packetReceiver.receiveNextPacket(this.in);
        PacketHeader header = this.packetReceiver.getHeader();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Receiving one packet for block " + this.block + ": " + header));
        }
        if (header.getOffsetInBlock() > this.replicaInfo.getNumBytes()) {
            throw new IOException("Received an out-of-sequence packet for " + this.block + "from " + this.inAddr + " at offset " + header.getOffsetInBlock() + ". Expecting packet starting at " + this.replicaInfo.getNumBytes());
        }
        if (header.getDataLen() < 0) {
            throw new IOException("Got wrong length during writeBlock(" + this.block + ") from " + this.inAddr + " at offset " + header.getOffsetInBlock() + ": " + header.getDataLen());
        }
        long offsetInBlock = header.getOffsetInBlock();
        long seqno = header.getSeqno();
        boolean lastPacketInBlock = header.isLastPacketInBlock();
        int len = header.getDataLen();
        boolean syncBlock = header.getSyncBlock();
        if (syncBlock && lastPacketInBlock) {
            this.syncOnClose = false;
        }
        long firstByteInBlock = offsetInBlock;
        if (this.replicaInfo.getNumBytes() < (offsetInBlock += (long)len)) {
            this.replicaInfo.setNumBytesNoPersistance(offsetInBlock);
        }
        if (this.responder != null && !syncBlock && !this.shouldVerifyChecksum()) {
            ((PacketResponder)this.responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock, DataTransferProtos.Status.SUCCESS);
        }
        if (this.mirrorOut != null && !this.mirrorError) {
            try {
                this.packetReceiver.mirrorPacketTo(this.mirrorOut);
                this.mirrorOut.flush();
            }
            catch (IOException e) {
                this.handleMirrorOutError(e);
            }
        }
        ByteBuffer dataBuf = this.packetReceiver.getDataSlice();
        ByteBuffer checksumBuf = this.packetReceiver.getChecksumSlice();
        if (lastPacketInBlock || len == 0) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Receiving an empty packet or the end of the block " + this.block));
            }
            if (syncBlock) {
                this.flushOrSync(true);
            }
        } else {
            int checksumLen = (len + this.bytesPerChecksum - 1) / this.bytesPerChecksum * this.checksumSize;
            if (checksumBuf.capacity() != checksumLen) {
                throw new IOException("Length of checksums in packet " + checksumBuf.capacity() + " does not match calculated checksum length " + checksumLen);
            }
            if (this.shouldVerifyChecksum()) {
                try {
                    this.verifyChunks(dataBuf, checksumBuf);
                }
                catch (IOException ioe) {
                    if (this.responder != null) {
                        try {
                            ((PacketResponder)this.responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock, DataTransferProtos.Status.ERROR_CHECKSUM);
                            Thread.sleep(3000L);
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                    }
                    throw new IOException("Terminating due to a checksum error." + ioe);
                }
                if (this.needsChecksumTranslation) {
                    this.translateChunks(dataBuf, checksumBuf);
                }
            }
            try {
                long onDiskLen = this.replicaInfo.getBytesOnDisk();
                if (onDiskLen < offsetInBlock) {
                    byte[] lastChunkChecksum;
                    if (onDiskLen % (long)this.bytesPerChecksum != 0L) {
                        this.adjustCrcFilePosition();
                    }
                    if (firstByteInBlock % (long)this.bytesPerChecksum != 0L) {
                        LOG.info((Object)("Packet starts at " + firstByteInBlock + " for " + this.block + " which is not a multiple of bytesPerChecksum " + this.bytesPerChecksum));
                        long offsetInChecksum = (long)BlockMetadataHeader.getHeaderSize() + onDiskLen / (long)this.bytesPerChecksum * (long)this.checksumSize;
                        this.computePartialChunkCrc(onDiskLen, offsetInChecksum, this.bytesPerChecksum);
                    }
                    int startByteToDisk = (int)(onDiskLen - firstByteInBlock) + dataBuf.arrayOffset() + dataBuf.position();
                    int numBytesToDisk = (int)(offsetInBlock - onDiskLen);
                    this.out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
                    if (this.partialCrc != null) {
                        if (len > this.bytesPerChecksum) {
                            throw new IOException("Got wrong length during writeBlock(" + this.block + ") from " + this.inAddr + " A packet can have only one partial chunk. len = " + len + " bytesPerChecksum " + this.bytesPerChecksum);
                        }
                        this.partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
                        byte[] buf = FSOutputSummer.convertToByteStream((Checksum)this.partialCrc, (int)this.checksumSize);
                        lastChunkChecksum = Arrays.copyOfRange(buf, buf.length - this.checksumSize, buf.length);
                        this.checksumOut.write(buf);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Writing out partial crc for data len " + len));
                        }
                        this.partialCrc = null;
                    } else {
                        lastChunkChecksum = Arrays.copyOfRange(checksumBuf.array(), checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - this.checksumSize, checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
                        this.checksumOut.write(checksumBuf.array(), checksumBuf.arrayOffset() + checksumBuf.position(), checksumLen);
                    }
                    this.flushOrSync(syncBlock);
                    this.replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastChunkChecksum);
                    this.datanode.metrics.incrBytesWritten(len);
                    this.manageWriterOsCache(offsetInBlock);
                }
            }
            catch (IOException iex) {
                this.datanode.checkDiskError(iex);
                throw iex;
            }
        }
        if (this.responder != null && (syncBlock || this.shouldVerifyChecksum())) {
            ((PacketResponder)this.responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock, DataTransferProtos.Status.SUCCESS);
        }
        if (this.throttler != null) {
            this.throttler.throttle(len);
        }
        return lastPacketInBlock ? -1 : len;
    }

    private void manageWriterOsCache(long offsetInBlock) {
        try {
            if (this.outFd != null && offsetInBlock > this.lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
                long dropPos;
                if (this.syncBehindWrites) {
                    NativeIO.POSIX.syncFileRangeIfPossible((FileDescriptor)this.outFd, (long)this.lastCacheManagementOffset, (long)(offsetInBlock - this.lastCacheManagementOffset), (int)NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
                }
                if ((dropPos = this.lastCacheManagementOffset - CACHE_DROP_LAG_BYTES) > 0L && this.dropCacheBehindWrites) {
                    NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(this.block.getBlockName(), this.outFd, 0L, dropPos, NativeIO.POSIX.POSIX_FADV_DONTNEED);
                }
                this.lastCacheManagementOffset = offsetInBlock;
            }
        }
        catch (Throwable t) {
            LOG.warn((Object)("Error managing cache for writer of block " + this.block), t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void receiveBlock(DataOutputStream mirrOut, DataInputStream mirrIn, DataOutputStream replyOut, String mirrAddr, DataTransferThrottler throttlerArg, DatanodeInfo[] downstreams) throws IOException {
        block51: {
            boolean responderClosed;
            block48: {
                this.syncOnClose = this.datanode.getDnConf().syncOnClose;
                responderClosed = false;
                this.mirrorOut = mirrOut;
                this.mirrorAddr = mirrAddr;
                this.throttler = throttlerArg;
                try {
                    if (this.isClient && !this.isTransfer) {
                        this.responder = new Daemon(this.datanode.threadGroup, (Runnable)new PacketResponder(replyOut, mirrIn, downstreams));
                        this.responder.start();
                    }
                    while (this.receivePacket() >= 0) {
                    }
                    if (this.responder != null) {
                        ((PacketResponder)this.responder.getRunnable()).close();
                        responderClosed = true;
                    }
                    if (!this.isDatanode && !this.isTransfer) break block48;
                    this.close();
                    this.block.setNumBytes(this.replicaInfo.getNumBytes());
                    if (this.stage == BlockConstructionStage.TRANSFER_RBW) {
                        this.datanode.data.convertTemporaryToRbw(this.block);
                    } else {
                        this.datanode.data.finalizeBlock(this.block);
                    }
                    this.datanode.metrics.incrBlocksWritten();
                }
                catch (IOException ioe) {
                    block52: {
                        try {
                            if (!this.datanode.isRestarting()) {
                                LOG.info((Object)("Exception for " + this.block), (Throwable)ioe);
                                throw ioe;
                            }
                            LOG.info((Object)("Shutting down for restart (" + this.block + ")."));
                        }
                        catch (Throwable throwable) {
                            Thread.interrupted();
                            if (!responderClosed) {
                                if (this.responder != null) {
                                    if (this.datanode.isRestarting() && this.isClient && !this.isTransfer) {
                                        File blockFile = ((ReplicaInPipeline)this.replicaInfo).getBlockFile();
                                        File restartMeta = new File(blockFile.getParent() + File.pathSeparator + "." + blockFile.getName() + ".restart");
                                        if (restartMeta.exists() && !restartMeta.delete()) {
                                            LOG.warn((Object)("Failed to delete restart meta file: " + restartMeta.getPath()));
                                        }
                                        try {
                                            FileWriter out = new FileWriter(restartMeta);
                                            out.write(Long.toString(Time.now() + this.restartBudget));
                                            out.flush();
                                            out.close();
                                        }
                                        catch (IOException out) {
                                            // empty catch block
                                        }
                                        try {
                                            ((PacketResponder)this.responder.getRunnable()).sendOOBResponse(PipelineAck.getRestartOOBStatus());
                                            Thread.sleep(1000L);
                                        }
                                        catch (InterruptedException out) {
                                        }
                                        catch (IOException ioe2) {
                                            LOG.info((Object)"Error sending OOB Ack.", (Throwable)ioe2);
                                        }
                                    }
                                    this.responder.interrupt();
                                }
                                IOUtils.closeStream((Closeable)this);
                                this.cleanupBlock();
                            }
                            if (this.responder != null) {
                                block54: {
                                    try {
                                        this.responder.interrupt();
                                        long joinTimeout = this.datanode.getDnConf().getXceiverStopTimeout();
                                        joinTimeout = joinTimeout > 1L ? joinTimeout * 8L / 10L : joinTimeout;
                                        this.responder.join(joinTimeout);
                                        if (this.responder.isAlive()) {
                                            String msg = "Join on responder thread " + this.responder + " timed out";
                                            LOG.warn((Object)(msg + "\n" + StringUtils.getStackTrace((Thread)this.responder)));
                                            throw new IOException(msg);
                                        }
                                    }
                                    catch (InterruptedException e) {
                                        this.responder.interrupt();
                                        if (this.datanode.isRestarting()) break block54;
                                        throw new IOException("Interrupted receiveBlock");
                                    }
                                }
                                this.responder = null;
                            }
                            throw throwable;
                        }
                        Thread.interrupted();
                        if (!responderClosed) {
                            if (this.responder != null) {
                                if (this.datanode.isRestarting() && this.isClient && !this.isTransfer) {
                                    File blockFile = ((ReplicaInPipeline)this.replicaInfo).getBlockFile();
                                    File restartMeta = new File(blockFile.getParent() + File.pathSeparator + "." + blockFile.getName() + ".restart");
                                    if (restartMeta.exists() && !restartMeta.delete()) {
                                        LOG.warn((Object)("Failed to delete restart meta file: " + restartMeta.getPath()));
                                    }
                                    try {
                                        FileWriter out = new FileWriter(restartMeta);
                                        out.write(Long.toString(Time.now() + this.restartBudget));
                                        out.flush();
                                        out.close();
                                    }
                                    catch (IOException out) {
                                        // empty catch block
                                    }
                                    try {
                                        ((PacketResponder)this.responder.getRunnable()).sendOOBResponse(PipelineAck.getRestartOOBStatus());
                                        Thread.sleep(1000L);
                                    }
                                    catch (InterruptedException out) {
                                    }
                                    catch (IOException ioe3) {
                                        LOG.info((Object)"Error sending OOB Ack.", (Throwable)ioe3);
                                    }
                                }
                                this.responder.interrupt();
                            }
                            IOUtils.closeStream((Closeable)this);
                            this.cleanupBlock();
                        }
                        if (this.responder == null) break block51;
                        try {
                            this.responder.interrupt();
                            long joinTimeout = this.datanode.getDnConf().getXceiverStopTimeout();
                            joinTimeout = joinTimeout > 1L ? joinTimeout * 8L / 10L : joinTimeout;
                            this.responder.join(joinTimeout);
                            if (this.responder.isAlive()) {
                                String msg = "Join on responder thread " + this.responder + " timed out";
                                LOG.warn((Object)(msg + "\n" + StringUtils.getStackTrace((Thread)this.responder)));
                                throw new IOException(msg);
                            }
                        }
                        catch (InterruptedException e) {
                            this.responder.interrupt();
                            if (this.datanode.isRestarting()) break block52;
                            throw new IOException("Interrupted receiveBlock");
                        }
                    }
                    this.responder = null;
                }
            }
            Thread.interrupted();
            if (!responderClosed) {
                if (this.responder != null) {
                    if (this.datanode.isRestarting() && this.isClient && !this.isTransfer) {
                        File blockFile = ((ReplicaInPipeline)this.replicaInfo).getBlockFile();
                        File restartMeta = new File(blockFile.getParent() + File.pathSeparator + "." + blockFile.getName() + ".restart");
                        if (restartMeta.exists() && !restartMeta.delete()) {
                            LOG.warn((Object)("Failed to delete restart meta file: " + restartMeta.getPath()));
                        }
                        try {
                            FileWriter out = new FileWriter(restartMeta);
                            out.write(Long.toString(Time.now() + this.restartBudget));
                            out.flush();
                            out.close();
                        }
                        catch (IOException out) {
                            // empty catch block
                        }
                        try {
                            ((PacketResponder)this.responder.getRunnable()).sendOOBResponse(PipelineAck.getRestartOOBStatus());
                            Thread.sleep(1000L);
                        }
                        catch (InterruptedException out) {
                        }
                        catch (IOException ioe) {
                            LOG.info((Object)"Error sending OOB Ack.", (Throwable)ioe);
                        }
                    }
                    this.responder.interrupt();
                }
                IOUtils.closeStream((Closeable)this);
                this.cleanupBlock();
            }
            if (this.responder != null) {
                block49: {
                    try {
                        this.responder.interrupt();
                        long joinTimeout = this.datanode.getDnConf().getXceiverStopTimeout();
                        joinTimeout = joinTimeout > 1L ? joinTimeout * 8L / 10L : joinTimeout;
                        this.responder.join(joinTimeout);
                        if (this.responder.isAlive()) {
                            String msg = "Join on responder thread " + this.responder + " timed out";
                            LOG.warn((Object)(msg + "\n" + StringUtils.getStackTrace((Thread)this.responder)));
                            throw new IOException(msg);
                        }
                    }
                    catch (InterruptedException e) {
                        this.responder.interrupt();
                        if (this.datanode.isRestarting()) break block49;
                        throw new IOException("Interrupted receiveBlock");
                    }
                }
                this.responder = null;
            }
        }
    }

    private void cleanupBlock() throws IOException {
        if (this.isDatanode) {
            this.datanode.data.unfinalizeBlock(this.block);
        }
    }

    private void adjustCrcFilePosition() throws IOException {
        if (this.out != null) {
            this.out.flush();
        }
        if (this.checksumOut != null) {
            this.checksumOut.flush();
        }
        this.datanode.data.adjustCrcChannelPosition(this.block, this.streams, this.checksumSize);
    }

    private static long checksum2long(byte[] checksum) {
        long crc = 0L;
        for (int i = 0; i < checksum.length; ++i) {
            crc |= (0xFFL & (long)checksum[i]) << (checksum.length - i - 1) * 8;
        }
        return crc;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void computePartialChunkCrc(long blkoff, long ckoff, int bytesPerChecksum) throws IOException {
        int sizePartialChunk = (int)(blkoff % (long)bytesPerChecksum);
        int checksumSize = this.diskChecksum.getChecksumSize();
        LOG.info((Object)("computePartialChunkCrc sizePartialChunk " + sizePartialChunk + " " + this.block + " block offset " + (blkoff -= (long)sizePartialChunk) + " metafile offset " + ckoff));
        byte[] buf = new byte[sizePartialChunk];
        byte[] crcbuf = new byte[checksumSize];
        ReplicaInputStreams instr = null;
        try {
            instr = this.datanode.data.getTmpInputStreams(this.block, blkoff, ckoff);
            IOUtils.readFully((InputStream)instr.getDataIn(), (byte[])buf, (int)0, (int)sizePartialChunk);
            IOUtils.readFully((InputStream)instr.getChecksumIn(), (byte[])crcbuf, (int)0, (int)crcbuf.length);
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(instr);
            throw throwable;
        }
        IOUtils.closeStream((Closeable)instr);
        this.partialCrc = DataChecksum.newDataChecksum((DataChecksum.Type)this.diskChecksum.getChecksumType(), (int)this.diskChecksum.getBytesPerChecksum());
        this.partialCrc.update(buf, 0, sizePartialChunk);
        LOG.info((Object)("Read in partial CRC chunk from disk for " + this.block));
        if (this.partialCrc.getValue() != BlockReceiver.checksum2long(crcbuf)) {
            String msg = "Partial CRC " + this.partialCrc.getValue() + " does not match value computed the  last time file was closed " + BlockReceiver.checksum2long(crcbuf);
            throw new IOException(msg);
        }
    }

    private static class Packet {
        final long seqno;
        final boolean lastPacketInBlock;
        final long offsetInBlock;
        final long ackEnqueueNanoTime;
        final DataTransferProtos.Status ackStatus;

        Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock, long ackEnqueueNanoTime, DataTransferProtos.Status ackStatus) {
            this.seqno = seqno;
            this.lastPacketInBlock = lastPacketInBlock;
            this.offsetInBlock = offsetInBlock;
            this.ackEnqueueNanoTime = ackEnqueueNanoTime;
            this.ackStatus = ackStatus;
        }

        public String toString() {
            return this.getClass().getSimpleName() + "(seqno=" + this.seqno + ", lastPacketInBlock=" + this.lastPacketInBlock + ", offsetInBlock=" + this.offsetInBlock + ", ackEnqueueNanoTime=" + this.ackEnqueueNanoTime + ", ackStatus=" + (Object)((Object)this.ackStatus) + ")";
        }
    }

    class PacketResponder
    implements Runnable,
    Closeable {
        private final LinkedList<Packet> ackQueue = new LinkedList();
        private final Thread receiverThread = Thread.currentThread();
        private volatile boolean running = true;
        private final DataInputStream downstreamIn;
        private final DataOutputStream upstreamOut;
        private final PacketResponderType type;
        private final String myString;
        private boolean sending = false;

        public String toString() {
            return this.myString;
        }

        PacketResponder(DataOutputStream upstreamOut, DataInputStream downstreamIn, DatanodeInfo[] downstreams) {
            this.downstreamIn = downstreamIn;
            this.upstreamOut = upstreamOut;
            this.type = downstreams == null ? PacketResponderType.NON_PIPELINE : (downstreams.length == 0 ? PacketResponderType.LAST_IN_PIPELINE : PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE);
            StringBuilder b = new StringBuilder(this.getClass().getSimpleName()).append(": ").append(BlockReceiver.this.block).append(", type=").append((Object)this.type);
            if (this.type != PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
                b.append(", downstreams=").append(downstreams.length).append(":").append(Arrays.asList(downstreams));
            }
            this.myString = b.toString();
        }

        private boolean isRunning() {
            return this.running && (((BlockReceiver)BlockReceiver.this).datanode.shouldRun || BlockReceiver.this.datanode.isRestarting());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void enqueue(long seqno, boolean lastPacketInBlock, long offsetInBlock, DataTransferProtos.Status ackStatus) {
            Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock, System.nanoTime(), ackStatus);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)(this.myString + ": enqueue " + p));
            }
            LinkedList<Packet> linkedList = this.ackQueue;
            synchronized (linkedList) {
                if (this.running) {
                    this.ackQueue.addLast(p);
                    this.ackQueue.notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void sendOOBResponse(DataTransferProtos.Status ackStatus) throws IOException, InterruptedException {
            if (!this.running) {
                LOG.info((Object)("Cannot send OOB response " + (Object)((Object)ackStatus) + ". Responder not running."));
                return;
            }
            PacketResponder packetResponder = this;
            synchronized (packetResponder) {
                if (this.sending) {
                    this.wait(PipelineAck.getOOBTimeout(ackStatus));
                    if (this.sending) {
                        throw new IOException("Could not send OOB reponse in time: " + (Object)((Object)ackStatus));
                    }
                }
                this.sending = true;
            }
            LOG.info((Object)("Sending an out of band ack of type " + (Object)((Object)ackStatus)));
            try {
                this.sendAckUpstreamUnprotected(null, -2L, 0L, 0L, ackStatus);
            }
            finally {
                packetResponder = this;
                synchronized (packetResponder) {
                    this.sending = false;
                    this.notify();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        Packet waitForAckHead(long seqno) throws InterruptedException {
            LinkedList<Packet> linkedList = this.ackQueue;
            synchronized (linkedList) {
                while (this.isRunning() && this.ackQueue.size() == 0) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)(this.myString + ": seqno=" + seqno + " waiting for local datanode to finish write."));
                    }
                    this.ackQueue.wait();
                }
                return this.isRunning() ? this.ackQueue.getFirst() : null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            Object object = this.ackQueue;
            synchronized (object) {
                while (this.isRunning() && this.ackQueue.size() != 0) {
                    try {
                        this.ackQueue.wait();
                    }
                    catch (InterruptedException e) {
                        this.running = false;
                        Thread.currentThread().interrupt();
                    }
                }
            }
            object = this;
            synchronized (object) {
                this.running = false;
                this.notifyAll();
            }
        }

        @Override
        public void run() {
            long startTime;
            boolean lastPacketInBlock = false;
            long l = startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0L;
            while (this.isRunning() && !lastPacketInBlock) {
                long totalAckTimeNanos = 0L;
                boolean isInterrupted = false;
                try {
                    Packet pkt = null;
                    long expected = -2L;
                    PipelineAck ack = new PipelineAck();
                    long seqno = -2L;
                    long ackRecvNanoTime = 0L;
                    try {
                        if (this.type != PacketResponderType.LAST_IN_PIPELINE && !BlockReceiver.this.mirrorError) {
                            DataTransferProtos.Status oobStatus;
                            ack.readFields(this.downstreamIn);
                            ackRecvNanoTime = System.nanoTime();
                            if (LOG.isDebugEnabled()) {
                                LOG.debug((Object)(this.myString + " got " + ack));
                            }
                            if ((oobStatus = ack.getOOBStatus()) != null) {
                                LOG.info((Object)("Relaying an out of band ack of type " + (Object)((Object)oobStatus)));
                                this.sendAckUpstream(ack, -2L, 0L, 0L, DataTransferProtos.Status.SUCCESS);
                                continue;
                            }
                            seqno = ack.getSeqno();
                        }
                        if (seqno != -2L || this.type == PacketResponderType.LAST_IN_PIPELINE) {
                            pkt = this.waitForAckHead(seqno);
                            if (!this.isRunning()) break;
                            expected = pkt.seqno;
                            if (this.type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE && seqno != expected) {
                                throw new IOException(this.myString + "seqno: expected=" + expected + ", received=" + seqno);
                            }
                            if (this.type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
                                totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
                                long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos();
                                if (ackTimeNanos < 0L) {
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug((Object)("Calculated invalid ack time: " + ackTimeNanos + "ns."));
                                    }
                                } else {
                                    ((BlockReceiver)BlockReceiver.this).datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
                                }
                            }
                            lastPacketInBlock = pkt.lastPacketInBlock;
                        }
                    }
                    catch (InterruptedException ine) {
                        isInterrupted = true;
                    }
                    catch (IOException ioe) {
                        if (Thread.interrupted()) {
                            isInterrupted = true;
                        }
                        BlockReceiver.this.mirrorError = true;
                        LOG.info((Object)this.myString, (Throwable)ioe);
                    }
                    if (Thread.interrupted() || isInterrupted) {
                        LOG.info((Object)(this.myString + ": Thread is interrupted."));
                        this.running = false;
                        continue;
                    }
                    if (lastPacketInBlock) {
                        this.finalizeBlock(startTime);
                    }
                    this.sendAckUpstream(ack, expected, totalAckTimeNanos, pkt != null ? pkt.offsetInBlock : 0L, pkt != null ? pkt.ackStatus : DataTransferProtos.Status.SUCCESS);
                    if (pkt == null) continue;
                    this.removeAckHead();
                }
                catch (IOException e) {
                    LOG.warn((Object)"IOException in BlockReceiver.run(): ", (Throwable)e);
                    if (!this.running) continue;
                    try {
                        BlockReceiver.this.datanode.checkDiskError(e);
                    }
                    catch (IOException ioe) {
                        LOG.warn((Object)"DataNode.checkDiskError failed in run() with: ", (Throwable)ioe);
                    }
                    LOG.info((Object)this.myString, (Throwable)e);
                    this.running = false;
                    if (Thread.interrupted()) continue;
                    this.receiverThread.interrupt();
                }
                catch (Throwable e) {
                    if (!this.running) continue;
                    LOG.info((Object)this.myString, e);
                    this.running = false;
                    this.receiverThread.interrupt();
                }
            }
            LOG.info((Object)(this.myString + " terminating"));
        }

        private void finalizeBlock(long startTime) throws IOException {
            BlockReceiver.this.close();
            long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0L;
            BlockReceiver.this.block.setNumBytes(BlockReceiver.this.replicaInfo.getNumBytes());
            ((BlockReceiver)BlockReceiver.this).datanode.data.finalizeBlock(BlockReceiver.this.block);
            BlockReceiver.this.datanode.closeBlock(BlockReceiver.this.block, "", BlockReceiver.this.replicaInfo.getStorageUuid());
            if (ClientTraceLog.isInfoEnabled() && BlockReceiver.this.isClient) {
                long offset = 0L;
                DatanodeRegistration dnR = BlockReceiver.this.datanode.getDNRegistrationForBP(BlockReceiver.this.block.getBlockPoolId());
                ClientTraceLog.info((Object)String.format("src: %s, dest: %s, bytes: %s, op: %s, cliID: %s, offset: %s, srvID: %s, blockid: %s, duration: %s", BlockReceiver.this.inAddr, BlockReceiver.this.myAddr, BlockReceiver.this.block.getNumBytes(), "HDFS_WRITE", BlockReceiver.this.clientname, offset, dnR.getDatanodeUuid(), BlockReceiver.this.block, endTime - startTime));
            } else {
                LOG.info((Object)("Received " + BlockReceiver.this.block + " size " + BlockReceiver.this.block.getNumBytes() + " from " + BlockReceiver.this.inAddr));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void sendAckUpstream(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, DataTransferProtos.Status myStatus) throws IOException {
            try {
                PacketResponder packetResponder = this;
                synchronized (packetResponder) {
                    while (this.sending) {
                        this.wait();
                    }
                    this.sending = true;
                }
                try {
                    if (!this.running) {
                        return;
                    }
                    this.sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos, offsetInBlock, myStatus);
                }
                finally {
                    packetResponder = this;
                    synchronized (packetResponder) {
                        this.sending = false;
                        this.notify();
                    }
                }
            }
            catch (InterruptedException ie) {
                this.running = false;
            }
        }

        private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, DataTransferProtos.Status myStatus) throws IOException {
            DataTransferProtos.Status[] replies = null;
            if (ack == null) {
                replies = new DataTransferProtos.Status[]{myStatus};
            } else if (BlockReceiver.this.mirrorError) {
                replies = MIRROR_ERROR_STATUS;
            } else {
                int ackLen = this.type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack.getNumOfReplies();
                replies = new DataTransferProtos.Status[1 + ackLen];
                replies[0] = myStatus;
                for (int i = 0; i < ackLen; ++i) {
                    replies[i + 1] = ack.getReply(i);
                }
                if (ackLen > 0 && replies[1] == DataTransferProtos.Status.ERROR_CHECKSUM) {
                    throw new IOException("Shutting down writer and responder since the down streams reported the data sent by this thread is corrupt");
                }
            }
            PipelineAck replyAck = new PipelineAck(seqno, replies, totalAckTimeNanos);
            if (replyAck.isSuccess() && offsetInBlock > BlockReceiver.this.replicaInfo.getBytesAcked()) {
                BlockReceiver.this.replicaInfo.setBytesAcked(offsetInBlock);
            }
            replyAck.write(this.upstreamOut);
            this.upstreamOut.flush();
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)(this.myString + ", replyAck=" + replyAck));
            }
            if (myStatus == DataTransferProtos.Status.ERROR_CHECKSUM) {
                throw new IOException("Shutting down writer and responder due to a checksum error in received data. The error response has been sent upstream.");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void removeAckHead() {
            LinkedList<Packet> linkedList = this.ackQueue;
            synchronized (linkedList) {
                this.ackQueue.removeFirst();
                this.ackQueue.notifyAll();
            }
        }
    }

    private static enum PacketResponderType {
        NON_PIPELINE,
        LAST_IN_PIPELINE,
        HAS_DOWNSTREAM_IN_PIPELINE;

    }
}

