/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClientFaultInjector;
import org.apache.hadoop.hdfs.DFSPacket;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StandardSocketFactory;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanId;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;

@InterfaceAudience.Private
class DataStreamer
extends Daemon {
    static final Log LOG = LogFactory.getLog(DataStreamer.class);
    private volatile boolean streamerClosed = false;
    private ExtendedBlock block;
    private Token<BlockTokenIdentifier> accessToken;
    private DataOutputStream blockStream;
    private DataInputStream blockReplyStream;
    private ResponseProcessor response = null;
    private volatile DatanodeInfo[] nodes = null;
    private volatile StorageType[] storageTypes = null;
    private volatile String[] storageIDs = null;
    volatile boolean hasError = false;
    volatile int errorIndex = -1;
    AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
    private long restartDeadline = 0L;
    private BlockConstructionStage stage;
    private long bytesSent = 0L;
    private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
    private long lastAckedSeqnoBeforeFailure = -1L;
    private int pipelineRecoveryCount = 0;
    private boolean isHflushed = false;
    private final boolean isAppend;
    private long currentSeqno = 0L;
    private long lastQueuedSeqno = -1L;
    private long lastAckedSeqno = -1L;
    private long bytesCurBlock = 0L;
    private final LastExceptionInStreamer lastException = new LastExceptionInStreamer();
    private Socket s;
    private final DFSClient dfsClient;
    private final String src;
    private final DataChecksum checksum;
    private final Progressable progress;
    private final HdfsFileStatus stat;
    private volatile boolean appendChunk = false;
    private final LinkedList<DFSPacket> dataQueue = new LinkedList();
    private final LinkedList<DFSPacket> ackQueue = new LinkedList();
    private final AtomicReference<CachingStrategy> cachingStrategy;
    private final ByteArrayManager byteArrayManager;
    private static final BlockStoragePolicySuite blockStoragePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
    private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
    private boolean failPacket = false;
    private final long dfsclientSlowLogThresholdMs;
    private long artificialSlowdown = 0L;
    private final List<DatanodeInfo> congestedNodes = new ArrayList<DatanodeInfo>();
    private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
    private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = 50000;
    private int lastCongestionBackoffTime;
    private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
    private final String[] favoredNodes;
    private final LinkedList<DFSPacket> smallFileDataQueue = new LinkedList();
    private LocatedBlock lb;
    private boolean erasureCodingSourceStream = false;
    private int currentBlockIndex = 0;
    private int stripeLength;
    private HashSet<DatanodeInfo> usedNodes = new HashSet();
    private int parityLength;
    private boolean erasureCodingParityStream = false;
    private List<DatanodeInfo> stripeNodes = new LinkedList<DatanodeInfo>();
    private List<LocatedBlock> sourceBlocks = Collections.emptyList();
    private List<DatanodeInfo> parityStripeNodes = new LinkedList<DatanodeInfo>();
    private final int dbFileMaxSize;
    private final boolean forceClientToWriteSFToDisk;
    private boolean isThisFileStoredInDB = false;
    private boolean syncOrFlushCalled = false;
    private static BlockStoragePolicySuite policySuite = BlockStoragePolicySuite.createDefaultSuite();

    static Socket createSocketForPipeline(DatanodeInfo first, int length, DFSClient client) throws IOException {
        DfsClientConf conf = client.getConf();
        String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Connecting to datanode " + dnAddr));
        }
        InetSocketAddress isa = NetUtils.createSocketAddr((String)dnAddr);
        StandardSocketFactory socketFactory = new StandardSocketFactory();
        Socket sock = socketFactory.createSocket();
        int timeout = client.getDatanodeReadTimeout(length);
        NetUtils.connect((Socket)sock, (SocketAddress)isa, (SocketAddress)client.getRandomLocalInterfaceAddr(), (int)conf.getSocketTimeout());
        sock.setSoTimeout(timeout);
        sock.setSendBufferSize(131072);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Send buf size " + sock.getSendBufferSize()));
        }
        return sock;
    }

    private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
        for (DFSPacket p : packets) {
            p.releaseBuffer(bam);
        }
        packets.clear();
    }

    private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, int dbFileMaxSize, boolean forceClientToWriteSFToDisk, boolean isAppend, String[] favoredNodes) {
        this.dfsClient = dfsClient;
        this.src = src;
        this.progress = progress;
        this.stat = stat;
        this.cachingStrategy = cachingStrategy;
        this.byteArrayManager = byteArrayManage;
        this.dfsclientSlowLogThresholdMs = dfsClient.getConf().getSlowIoWarningThresholdMs();
        this.excludedNodes = this.initExcludedNodes();
        this.isAppend = isAppend;
        this.favoredNodes = favoredNodes;
        this.checksum = checksum;
        this.dbFileMaxSize = dbFileMaxSize;
        this.forceClientToWriteSFToDisk = forceClientToWriteSFToDisk;
        this.isThisFileStoredInDB = this.forceClientToWriteSFToDisk ? false : policySuite.getPolicy(stat.getStoragePolicy()).getStorageTypes()[0] == StorageType.DB;
    }

    DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, int dbFileMaxSize, boolean forceClientToWriteSFToDisk, String[] favoredNodes) {
        this(stat, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, dbFileMaxSize, forceClientToWriteSFToDisk, false, favoredNodes);
        this.block = block;
        this.stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
    }

    DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, int dbFileMaxSize, boolean forceClientToWriteSFToDisk) throws IOException {
        this(stat, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, dbFileMaxSize, forceClientToWriteSFToDisk, true, null);
        this.stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
        this.block = lastBlock.getBlock();
        this.bytesSent = this.block.getNumBytes();
        this.accessToken = lastBlock.getBlockToken();
    }

    DataStreamer(HdfsFileStatus stat, LocatedBlock lb, boolean sigleBlock, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, int dbFileMaxSize, boolean saveSmallFilesInDB) {
        this(stat, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, dbFileMaxSize, saveSmallFilesInDB, false, null);
        this.stage = BlockConstructionStage.PIPELINE_SETUP_SINGLE_BLOCK;
        this.lb = lb;
    }

    void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException {
        this.setPipeline(lastBlock);
        this.errorIndex = -1;
        if (this.nodes.length < 1) {
            throw new IOException("Unable to retrieve blocks locations  for last block " + this.block + "of file " + this.src);
        }
    }

    private void setPipeline(LocatedBlock lb) {
        this.setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
    }

    private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, String[] storageIDs) {
        this.nodes = nodes;
        this.storageTypes = storageTypes;
        this.storageIDs = storageIDs;
    }

    private void initDataStreaming() {
        this.setName("DataStreamer for file " + this.src + " block " + this.block);
        this.response = new ResponseProcessor(this.nodes);
        this.response.start();
        this.stage = BlockConstructionStage.DATA_STREAMING;
    }

    private void endBlock() {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Closing old block " + this.block));
        }
        this.setName("DataStreamer for file " + this.src);
        this.closeResponder();
        this.closeStream();
        this.setPipeline(null, null, null);
        this.stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        long lastPacket = Time.monotonicNow();
        TraceScope scope = null;
        while (!this.streamerClosed && this.dfsClient.clientRunning) {
            if (this.hasError && this.response != null) {
                try {
                    this.response.close();
                    this.response.join();
                    this.response = null;
                }
                catch (InterruptedException e) {
                    LOG.warn((Object)"Caught exception", (Throwable)e);
                }
            }
            try {
                DFSPacket one;
                boolean doSleep = false;
                if (this.hasError && (this.errorIndex >= 0 || this.restartingNodeIndex.get() >= 0)) {
                    doSleep = this.processDatanodeError();
                }
                int halfSocketTimeout = this.dfsClient.getConf().getSocketTimeout() / 2;
                LinkedList<DFSPacket> linkedList = this.dataQueue;
                synchronized (linkedList) {
                    block73: {
                        long now = Time.monotonicNow();
                        while (!this.streamerClosed && !this.hasError && this.dfsClient.clientRunning && this.dataQueue.size() == 0 && (this.stage != BlockConstructionStage.DATA_STREAMING || this.stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < (long)halfSocketTimeout) || doSleep) {
                            long timeout = (long)halfSocketTimeout - (now - lastPacket);
                            timeout = timeout <= 0L ? 1000L : timeout;
                            timeout = this.stage == BlockConstructionStage.DATA_STREAMING ? timeout : 1000L;
                            try {
                                this.dataQueue.wait(timeout);
                            }
                            catch (InterruptedException e) {
                                LOG.warn((Object)"Caught exception", (Throwable)e);
                            }
                            doSleep = false;
                            now = Time.monotonicNow();
                        }
                        if (!this.streamerClosed && !this.hasError && this.dfsClient.clientRunning) break block73;
                        continue;
                    }
                    if (this.dataQueue.isEmpty()) {
                        one = this.createHeartbeatPacket();
                        assert (one != null);
                    } else {
                        try {
                            this.backOffIfNecessary();
                        }
                        catch (InterruptedException e) {
                            LOG.warn((Object)"Caught exception", (Throwable)e);
                        }
                        one = this.dataQueue.getFirst();
                        SpanId[] parents = one.getTraceParents();
                        if (parents.length > 0) {
                            scope = this.dfsClient.getTracer().newScope("dataStreamer", parents[0]);
                            scope.getSpan().setParents(parents);
                        }
                    }
                }
                if (this.stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)"Allocating new block");
                    }
                    this.setPipeline(this.nextBlockOutputStream());
                    this.initDataStreaming();
                } else if (this.stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Append to block " + this.block));
                    }
                    this.setupPipelineForAppendOrRecovery();
                    this.initDataStreaming();
                } else if (this.stage == BlockConstructionStage.PIPELINE_SETUP_SINGLE_BLOCK) {
                    this.stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Send single block " + this.block));
                    }
                    this.setPipeline(this.lb);
                    this.nodes = this.setupPipelineForSingleBlock(this.lb);
                    this.initDataStreaming();
                }
                long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
                if (lastByteOffsetInBlock > this.stat.getBlockSize()) {
                    throw new IOException("BlockSize " + this.stat.getBlockSize() + " is smaller than data size.  Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + this.src);
                }
                if (one.isLastPacketInBlock()) {
                    LinkedList<DFSPacket> linkedList2 = this.dataQueue;
                    synchronized (linkedList2) {
                        while (!this.streamerClosed && !this.hasError && this.ackQueue.size() != 0 && this.dfsClient.clientRunning) {
                            try {
                                this.dataQueue.wait(1000L);
                            }
                            catch (InterruptedException e) {
                                LOG.warn((Object)"Caught exception", (Throwable)e);
                            }
                        }
                    }
                    if (this.streamerClosed || this.hasError || !this.dfsClient.clientRunning) continue;
                    this.stage = BlockConstructionStage.PIPELINE_CLOSE;
                }
                SpanId spanId = SpanId.INVALID;
                LinkedList<DFSPacket> e = this.dataQueue;
                synchronized (e) {
                    if (!one.isHeartbeatPacket()) {
                        if (scope != null) {
                            spanId = scope.getSpanId();
                            scope.detach();
                            one.setTraceScope(scope);
                        }
                        scope = null;
                        this.dataQueue.removeFirst();
                        this.ackQueue.addLast(one);
                        this.dataQueue.notifyAll();
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("DataStreamer block " + this.block + " sending packet " + one));
                }
                try (TraceScope ignored = this.dfsClient.getTracer().newScope("DataStreamer#writeTo", spanId);){
                    one.writeTo(this.blockStream);
                    this.blockStream.flush();
                }
                catch (IOException e2) {
                    this.tryMarkPrimaryDatanodeFailed();
                    throw e2;
                }
                lastPacket = Time.monotonicNow();
                long tmpBytesSent = one.getLastByteOffsetBlock();
                if (this.bytesSent < tmpBytesSent) {
                    this.bytesSent = tmpBytesSent;
                }
                if (this.streamerClosed || this.hasError || !this.dfsClient.clientRunning) continue;
                if (one.isLastPacketInBlock()) {
                    LinkedList<DFSPacket> linkedList3 = this.dataQueue;
                    synchronized (linkedList3) {
                        while (!this.streamerClosed && !this.hasError && this.ackQueue.size() != 0 && this.dfsClient.clientRunning) {
                            this.dataQueue.wait(1000L);
                        }
                    }
                    if (this.streamerClosed || this.hasError || !this.dfsClient.clientRunning) continue;
                    this.endBlock();
                }
                if (this.progress != null) {
                    this.progress.progress();
                }
                if (this.artificialSlowdown == 0L || !this.dfsClient.clientRunning) continue;
                Thread.sleep(this.artificialSlowdown);
            }
            catch (Throwable e) {
                if (this.restartingNodeIndex.get() == -1) {
                    if (e instanceof QuotaExceededException) {
                        LOG.debug((Object)"DataStreamer Quota Exception", e);
                    } else {
                        LOG.warn((Object)"DataStreamer Exception", e);
                    }
                }
                this.lastException.set(e);
                this.hasError = true;
                if (this.errorIndex != -1 || this.restartingNodeIndex.get() != -1) continue;
                this.streamerClosed = true;
            }
            finally {
                if (scope == null) continue;
                scope.close();
                scope = null;
            }
        }
        this.closeInternal();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeInternal() {
        this.closeResponder();
        this.closeStream();
        this.streamerClosed = true;
        this.release();
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            this.dataQueue.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void release() {
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            DataStreamer.releaseBuffer(this.dataQueue, this.byteArrayManager);
            DataStreamer.releaseBuffer(this.ackQueue, this.byteArrayManager);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void waitForAckedSeqno(long seqno) throws IOException {
        block14: {
            try (TraceScope scope = this.dfsClient.getTracer().newScope("waitForAckedSeqno");){
                if (this.canStoreFileInDB()) {
                    LOG.debug((Object)"Stuffed Inode:  Closing File. Datanode ack skipped. All the data will be stored in the database");
                    break block14;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Waiting for ack for: " + seqno));
                }
                long begin = Time.monotonicNow();
                try {
                    LinkedList<DFSPacket> linkedList = this.dataQueue;
                    synchronized (linkedList) {
                        while (!this.streamerClosed) {
                            this.checkClosed();
                            if (this.lastAckedSeqno >= seqno) break;
                            try {
                                this.dataQueue.wait(1000L);
                            }
                            catch (InterruptedException ie) {
                                throw new InterruptedIOException("Interrupted while waiting for data to be acknowledged by pipeline");
                            }
                        }
                    }
                    this.checkClosed();
                }
                catch (ClosedChannelException closedChannelException) {
                    // empty catch block
                }
                long duration = Time.monotonicNow() - begin;
                if (duration > this.dfsclientSlowLogThresholdMs) {
                    LOG.warn((Object)("Slow waitForAckedSeqno took " + duration + "ms (threshold=" + this.dfsclientSlowLogThresholdMs + "ms)"));
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void waitAndQueuePacket(DFSPacket packet) throws IOException {
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            try {
                Span span;
                boolean firstWait = true;
                try {
                    while (!this.streamerClosed && this.dataQueue.size() + this.ackQueue.size() > this.dfsClient.getConf().getWriteMaxPackets()) {
                        if (firstWait) {
                            span = Tracer.getCurrentSpan();
                            if (span != null) {
                                span.addTimelineAnnotation("dataQueue.wait");
                            }
                            firstWait = false;
                        }
                        try {
                            this.dataQueue.wait();
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
                finally {
                    span = Tracer.getCurrentSpan();
                    if (span != null && !firstWait) {
                        span.addTimelineAnnotation("end.wait");
                    }
                }
                this.checkClosed();
                this.queuePacket(packet);
            }
            catch (ClosedChannelException closedChannelException) {
                // empty catch block
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close(boolean force) {
        this.streamerClosed = true;
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            this.dataQueue.notifyAll();
        }
        if (force) {
            this.interrupt();
        }
    }

    private void checkClosed() throws IOException {
        if (this.streamerClosed) {
            this.lastException.throwException4Close();
        }
    }

    private void closeResponder() {
        if (this.response != null) {
            try {
                this.response.close();
                this.response.join();
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Caught exception", (Throwable)e);
            }
            finally {
                this.response = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeStream() {
        IOException ioe;
        MultipleIOException.Builder b = new MultipleIOException.Builder();
        if (this.blockStream != null) {
            try {
                this.blockStream.close();
            }
            catch (IOException e) {
                b.add((Throwable)e);
            }
            finally {
                this.blockStream = null;
            }
        }
        if (this.blockReplyStream != null) {
            try {
                this.blockReplyStream.close();
            }
            catch (IOException e) {
                b.add((Throwable)e);
            }
            finally {
                this.blockReplyStream = null;
            }
        }
        if (null != this.s) {
            try {
                this.s.close();
            }
            catch (IOException e) {
                b.add((Throwable)e);
            }
            finally {
                this.s = null;
            }
        }
        if ((ioe = b.build()) != null) {
            this.lastException.set(ioe);
        }
    }

    synchronized void setErrorIndex(int idx) {
        this.errorIndex = idx;
    }

    synchronized void setRestartingNodeIndex(int idx) {
        this.restartingNodeIndex.set(idx);
        this.errorIndex = -1;
    }

    synchronized void tryMarkPrimaryDatanodeFailed() {
        if (this.errorIndex == -1 && this.restartingNodeIndex.get() == -1) {
            this.errorIndex = 0;
        }
    }

    boolean shouldWaitForRestart(int index) {
        InetAddress addr;
        block3: {
            if (this.nodes.length == 1) {
                return true;
            }
            addr = null;
            try {
                addr = InetAddress.getByName(this.nodes[index].getIpAddr());
            }
            catch (UnknownHostException e) {
                if ($assertionsDisabled) break block3;
                throw new AssertionError();
            }
        }
        return addr != null && NetUtils.isLocalAddress((InetAddress)addr);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processDatanodeError() throws IOException {
        if (this.response != null) {
            LOG.info((Object)("Error Recovery for " + this.block + " waiting for responder to exit. "));
            return true;
        }
        this.closeStream();
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            this.dataQueue.addAll(0, this.ackQueue);
            this.ackQueue.clear();
        }
        if (this.lastAckedSeqnoBeforeFailure != this.lastAckedSeqno) {
            this.lastAckedSeqnoBeforeFailure = this.lastAckedSeqno;
            this.pipelineRecoveryCount = 1;
        } else if (++this.pipelineRecoveryCount > 5) {
            LOG.warn((Object)("Error recovering pipeline for writing " + this.block + ". Already retried 5 times for the same packet."));
            this.lastException.set(new IOException("Failing write. Tried pipeline recovery 5 times without success."));
            this.streamerClosed = true;
            return false;
        }
        boolean doSleep = this.setupPipelineForAppendOrRecovery();
        if (!this.streamerClosed && this.dfsClient.clientRunning) {
            if (this.stage == BlockConstructionStage.PIPELINE_CLOSE) {
                LinkedList<DFSPacket> linkedList2 = this.dataQueue;
                synchronized (linkedList2) {
                    DFSPacket endOfBlockPacket = this.dataQueue.remove();
                    TraceScope scope = endOfBlockPacket.getTraceScope();
                    if (scope != null) {
                        scope.reattach();
                        scope.close();
                        endOfBlockPacket.setTraceScope(null);
                    }
                    assert (endOfBlockPacket.isLastPacketInBlock());
                    assert (this.lastAckedSeqno == endOfBlockPacket.getSeqno() - 1L);
                    this.lastAckedSeqno = endOfBlockPacket.getSeqno();
                    this.dataQueue.notifyAll();
                }
                this.endBlock();
            } else {
                this.initDataStreaming();
            }
        }
        return doSleep;
    }

    void setHflush() {
        this.isHflushed = true;
    }

    private int findNewDatanode(DatanodeInfo[] original) throws IOException {
        if (this.nodes.length != original.length + 1) {
            throw new IOException("Failed to replace a bad datanode on the existing pipeline " + "due to no more good datanodes being available to try. " + "(Nodes: current=" + Arrays.asList(this.nodes) + ", original=" + Arrays.asList(original) + "). " + "The current failed datanode replacement policy is " + this.dfsClient.dtpReplaceDatanodeOnFailure + ", and " + "a client may configure this via '" + "dfs.client.block.write.replace-datanode-on-failure.policy" + "' in its configuration.");
        }
        for (int i = 0; i < this.nodes.length; ++i) {
            int j;
            for (j = 0; j < original.length && !this.nodes[i].equals((Object)original[j]); ++j) {
            }
            if (j != original.length) continue;
            return i;
        }
        throw new IOException("Failed: new datanode not found: nodes=" + Arrays.asList(this.nodes) + ", original=" + Arrays.asList(original));
    }

    private void addDatanode2ExistingPipeline() throws IOException {
        if (DataTransferProtocol.LOG.isDebugEnabled()) {
            DataTransferProtocol.LOG.debug((Object)("lastAckedSeqno = " + this.lastAckedSeqno));
        }
        if (!this.isAppend && this.lastAckedSeqno < 0L && this.stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
            return;
        }
        if (this.stage == BlockConstructionStage.PIPELINE_CLOSE || this.stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
            return;
        }
        DatanodeInfo[] original = this.nodes;
        LocatedBlock lb = this.dfsClient.namenode.getAdditionalDatanode(this.src, this.stat.getFileId(), this.block, this.nodes, this.storageIDs, this.failed.toArray(new DatanodeInfo[this.failed.size()]), 1, this.dfsClient.clientName);
        this.setPipeline(lb);
        int d = this.findNewDatanode(original);
        DatanodeInfo src = d == 0 ? this.nodes[1] : this.nodes[d - 1];
        DatanodeInfo[] targets = new DatanodeInfo[]{this.nodes[d]};
        StorageType[] targetStorageTypes = new StorageType[]{this.storageTypes[d]};
        this.transfer(src, targets, targetStorageTypes, (Token<BlockTokenIdentifier>)lb.getBlockToken());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void transfer(DatanodeInfo src, DatanodeInfo[] targets, StorageType[] targetStorageTypes, Token<BlockTokenIdentifier> blockToken) throws IOException {
        Socket sock = null;
        DataOutputStream out = null;
        DataInputStream in = null;
        try {
            sock = DataStreamer.createSocketForPipeline(src, 2, this.dfsClient);
            long writeTimeout = this.dfsClient.getDatanodeWriteTimeout(2);
            OutputStream unbufOut = NetUtils.getOutputStream((Socket)sock, (long)writeTimeout);
            Object unbufIn = NetUtils.getInputStream((Socket)sock);
            IOStreamPair saslStreams = this.dfsClient.saslClient.socketSend(sock, unbufOut, (InputStream)unbufIn, this.dfsClient, blockToken, (DatanodeID)src);
            unbufOut = saslStreams.out;
            unbufIn = saslStreams.in;
            out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE));
            in = new DataInputStream((InputStream)unbufIn);
            new Sender(out).transferBlock(this.block, blockToken, this.dfsClient.clientName, targets, targetStorageTypes);
            out.flush();
            DataTransferProtos.BlockOpResponseProto response = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
            if (DataTransferProtos.Status.SUCCESS != response.getStatus()) {
                throw new IOException("Failed to add a datanode");
            }
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
            IOUtils.closeSocket((Socket)sock);
            throw throwable;
        }
        IOUtils.closeStream((Closeable)in);
        IOUtils.closeStream((Closeable)out);
        IOUtils.closeSocket((Socket)sock);
    }

    private boolean setupPipelineForAppendOrRecovery() throws IOException {
        if (this.nodes == null || this.nodes.length == 0) {
            String msg = "Could not get block locations. Source file \"" + this.src + "\" - Aborting...";
            LOG.warn((Object)msg);
            this.lastException.set(new IOException(msg));
            this.streamerClosed = true;
            return false;
        }
        boolean success = false;
        long newGS = 0L;
        while (!success && !this.streamerClosed && this.dfsClient.clientRunning) {
            if (this.restartingNodeIndex.get() >= 0) {
                long delay = Math.min(this.dfsClient.getConf().getDatanodeRestartTimeout(), 4000L);
                try {
                    Thread.sleep(delay);
                }
                catch (InterruptedException ie) {
                    this.lastException.set(new IOException("Interrupted while waiting for datanode to restart. " + this.nodes[this.restartingNodeIndex.get()]));
                    this.streamerClosed = true;
                    return false;
                }
            }
            boolean isRecovery = this.hasError;
            if (this.errorIndex >= 0) {
                StringBuilder pipelineMsg = new StringBuilder();
                for (int j = 0; j < this.nodes.length; ++j) {
                    pipelineMsg.append(this.nodes[j]);
                    if (j >= this.nodes.length - 1) continue;
                    pipelineMsg.append(", ");
                }
                if (this.nodes.length <= 1) {
                    this.lastException.set(new IOException("All datanodes " + pipelineMsg + " are bad. Aborting..."));
                    this.streamerClosed = true;
                    return false;
                }
                LOG.warn((Object)("Error Recovery for block " + this.block + " in pipeline " + pipelineMsg + ": bad datanode " + this.nodes[this.errorIndex]));
                this.failed.add(this.nodes[this.errorIndex]);
                DatanodeInfo[] newnodes = new DatanodeInfo[this.nodes.length - 1];
                DataStreamer.arraycopy(this.nodes, newnodes, this.errorIndex);
                StorageType[] newStorageTypes = new StorageType[newnodes.length];
                DataStreamer.arraycopy(this.storageTypes, newStorageTypes, this.errorIndex);
                String[] newStorageIDs = new String[newnodes.length];
                DataStreamer.arraycopy(this.storageIDs, newStorageIDs, this.errorIndex);
                this.setPipeline(newnodes, newStorageTypes, newStorageIDs);
                if (this.restartingNodeIndex.get() >= 0) {
                    if (this.errorIndex > this.restartingNodeIndex.get()) {
                        this.restartingNodeIndex.set(-1);
                    } else if (this.errorIndex < this.restartingNodeIndex.get()) {
                        this.restartingNodeIndex.decrementAndGet();
                    } else assert (false);
                }
                if (this.restartingNodeIndex.get() == -1) {
                    this.hasError = false;
                }
                this.lastException.clear();
                this.errorIndex = -1;
            }
            if (this.dfsClient.dtpReplaceDatanodeOnFailure.satisfy(this.stat.getReplication(), this.nodes, this.isAppend, this.isHflushed)) {
                try {
                    this.addDatanode2ExistingPipeline();
                }
                catch (IOException ioe) {
                    if (!this.dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
                        throw ioe;
                    }
                    LOG.warn((Object)"Failed to replace datanode. Continue with the remaining datanodes since dfs.client.block.write.replace-datanode-on-failure.best-effort is set to true.", (Throwable)ioe);
                }
            }
            LocatedBlock lb = this.dfsClient.namenode.updateBlockForPipeline(this.block, this.dfsClient.clientName);
            newGS = lb.getBlock().getGenerationStamp();
            this.accessToken = lb.getBlockToken();
            if (this.failPacket) {
                success = this.createBlockOutputStream(this.nodes, this.storageTypes, newGS, isRecovery);
                this.failPacket = false;
                try {
                    Thread.sleep(2000L);
                }
                catch (InterruptedException newnodes) {}
            } else {
                success = this.createBlockOutputStream(this.nodes, this.storageTypes, newGS, isRecovery);
            }
            if (this.restartingNodeIndex.get() < 0) continue;
            assert (this.hasError);
            if (this.errorIndex == this.restartingNodeIndex.get()) {
                this.errorIndex = -1;
            }
            if (Time.monotonicNow() < this.restartDeadline) continue;
            this.restartDeadline = 0L;
            int expiredNodeIndex = this.restartingNodeIndex.get();
            this.restartingNodeIndex.set(-1);
            LOG.warn((Object)("Datanode did not restart in time: " + this.nodes[expiredNodeIndex]));
            if (this.errorIndex != -1) continue;
            this.errorIndex = expiredNodeIndex;
        }
        if (success) {
            ExtendedBlock newBlock = new ExtendedBlock(this.block.getBlockPoolId(), this.block.getBlockId(), this.block.getNumBytes(), newGS);
            this.dfsClient.namenode.updatePipeline(this.dfsClient.clientName, this.block, newBlock, (DatanodeID[])this.nodes, this.storageIDs);
            this.block = newBlock;
        }
        return false;
    }

    private LocatedBlock nextBlockOutputStream() throws IOException {
        LocatedBlock lb = null;
        DatanodeInfo[] nodes = null;
        StorageType[] storageTypes = null;
        int count = this.dfsClient.getConf().getNumBlockWriteRetry();
        boolean success = false;
        ExtendedBlock oldBlock = this.block;
        do {
            DatanodeInfo[] excluded;
            this.hasError = false;
            this.lastException.clear();
            this.errorIndex = -1;
            success = false;
            long startTime = Time.now();
            if (this.erasureCodingSourceStream && this.currentBlockIndex % this.stripeLength == 0) {
                this.usedNodes.clear();
                LOG.info((Object)("Stripe length " + this.stripeLength + " parity length " + this.parityLength));
                LOG.info((Object)("Source write block index " + this.currentBlockIndex));
            }
            if (this.erasureCodingParityStream && this.currentBlockIndex % this.parityLength == 0) {
                this.usedNodes.clear();
                this.stripeNodes.clear();
                int stripe = (int)Math.ceil((float)this.currentBlockIndex / (float)this.parityLength);
                int index = stripe * this.stripeLength;
                LOG.info((Object)("Stripe length " + this.stripeLength + " parity length " + this.parityLength));
                LOG.info((Object)("Parity write block index " + this.currentBlockIndex + " found index " + index + " end " + (index + this.stripeLength)));
                for (int j = index; j < this.sourceBlocks.size() && j < index + this.stripeLength; ++j) {
                    DatanodeInfo[] nodeInfos = this.sourceBlocks.get(j).getLocations();
                    Collections.addAll(this.stripeNodes, nodeInfos);
                }
            }
            if (this.erasureCodingSourceStream || this.erasureCodingParityStream) {
                DatanodeInfo node;
                ImmutableSet excludedSet = this.excludedNodes.getAllPresent(this.excludedNodes.asMap().keySet()).keySet();
                excluded = new DatanodeInfo[excludedSet.size() + this.usedNodes.size() + this.stripeNodes.size() + this.parityStripeNodes.size()];
                int i = 0;
                Object object = excludedSet.iterator();
                while (object.hasNext()) {
                    excluded[i] = node = (DatanodeInfo)object.next();
                    LOG.info((Object)("Excluding node " + node));
                    ++i;
                }
                object = this.usedNodes.iterator();
                while (object.hasNext()) {
                    excluded[i] = node = (DatanodeInfo)object.next();
                    LOG.info((Object)((this.erasureCodingSourceStream ? "Source stream: " : " Parity stream: ") + "Block " + this.currentBlockIndex + " excluding used node " + node));
                    ++i;
                }
                object = this.stripeNodes.iterator();
                while (object.hasNext()) {
                    excluded[i] = node = (DatanodeInfo)object.next();
                    LOG.info((Object)((this.erasureCodingSourceStream ? "Source stream: " : " Parity stream: ") + "Block " + this.currentBlockIndex + " excluding stripe node " + node));
                    ++i;
                }
                object = this.parityStripeNodes.iterator();
                while (object.hasNext()) {
                    excluded[i] = node = (DatanodeInfo)object.next();
                    LOG.info((Object)((this.erasureCodingSourceStream ? "Source stream: " : " Parity stream: ") + "Block " + this.currentBlockIndex + " excluding parity node " + node));
                    ++i;
                }
                ++this.currentBlockIndex;
            } else {
                excluded = (DatanodeInfo[])this.excludedNodes.getAllPresent(this.excludedNodes.asMap().keySet()).keySet().toArray((Object[])new DatanodeInfo[0]);
            }
            this.block = oldBlock;
            lb = this.locateFollowingBlock((DatanodeInfo[])(excluded.length > 0 ? excluded : null));
            this.block = lb.getBlock();
            this.block.setNumBytes(0L);
            this.bytesSent = 0L;
            this.accessToken = lb.getBlockToken();
            nodes = lb.getLocations();
            storageTypes = lb.getStorageTypes();
            success = this.createBlockOutputStream(nodes, storageTypes, 0L, false);
            if (success) continue;
            LOG.info((Object)("Abandoning " + this.block));
            this.dfsClient.namenode.abandonBlock(this.block, this.stat.getFileId(), this.src, this.dfsClient.clientName);
            this.block = null;
            LOG.info((Object)("Excluding datanode " + nodes[this.errorIndex]));
            this.excludedNodes.put((Object)nodes[this.errorIndex], (Object)nodes[this.errorIndex]);
        } while (!success && --count >= 0);
        if (!success) {
            throw new IOException("Unable to create new block.");
        }
        if (this.erasureCodingSourceStream || this.erasureCodingParityStream) {
            Collections.addAll(this.usedNodes, nodes);
        }
        return lb;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean createBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
        boolean result;
        if (nodes.length == 0) {
            LOG.info((Object)("nodes are empty for write pipeline of " + this.block));
            return false;
        }
        DataTransferProtos.Status pipelineStatus = DataTransferProtos.Status.SUCCESS;
        String firstBadLink = "";
        boolean checkRestart = false;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("pipeline = " + Arrays.asList(nodes)));
        }
        this.persistBlocks.set(true);
        int refetchEncryptionKey = 1;
        while (true) {
            result = false;
            DataOutputStream out = null;
            try {
                assert (null == this.s) : "Previous socket unclosed";
                assert (null == this.blockReplyStream) : "Previous blockReplyStream unclosed";
                this.s = DataStreamer.createSocketForPipeline(nodes[0], nodes.length, this.dfsClient);
                long writeTimeout = this.dfsClient.getDatanodeWriteTimeout(nodes.length);
                OutputStream unbufOut = NetUtils.getOutputStream((Socket)this.s, (long)writeTimeout);
                Object unbufIn = NetUtils.getInputStream((Socket)this.s);
                IOStreamPair saslStreams = this.dfsClient.saslClient.socketSend(this.s, unbufOut, (InputStream)unbufIn, this.dfsClient, this.accessToken, (DatanodeID)nodes[0]);
                unbufOut = saslStreams.out;
                unbufIn = saslStreams.in;
                out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE));
                this.blockReplyStream = new DataInputStream((InputStream)unbufIn);
                BlockConstructionStage bcs = recoveryFlag ? this.stage.getRecoveryStage() : this.stage;
                ExtendedBlock blockCopy = new ExtendedBlock(this.block);
                blockCopy.setNumBytes(this.stat.getBlockSize());
                boolean[] targetPinnings = this.getPinnings(nodes, true);
                new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], this.accessToken, this.dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, this.block.getNumBytes(), this.bytesSent, newGS, this.checksum, this.cachingStrategy.get(), targetPinnings == null ? false : targetPinnings[0], targetPinnings);
                DataTransferProtos.BlockOpResponseProto resp = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(this.blockReplyStream));
                pipelineStatus = resp.getStatus();
                firstBadLink = resp.getFirstBadLink();
                if (PipelineAck.isRestartOOBStatus(pipelineStatus) && this.restartingNodeIndex.get() == -1) {
                    checkRestart = true;
                    throw new IOException("A datanode is restarting.");
                }
                String logInfo = "ack with firstBadLink as " + firstBadLink;
                DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
                assert (null == this.blockStream) : "Previous blockStream unclosed";
                this.blockStream = out;
                result = true;
                this.restartingNodeIndex.set(-1);
                this.hasError = false;
                if (result) break;
            }
            catch (IOException ie) {
                block19: {
                    block18: {
                        if (this.restartingNodeIndex.get() == -1) {
                            LOG.info((Object)"Exception in createBlockOutputStream", (Throwable)ie);
                        }
                        if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
                            LOG.info((Object)("Will fetch a new encryption key and retry, encryption key was invalid when connecting to " + nodes[0] + " : " + ie));
                            --refetchEncryptionKey;
                            this.dfsClient.clearDataEncryptionKey();
                            continue;
                        }
                        if (firstBadLink.length() == 0) break block18;
                        for (int i = 0; i < nodes.length; ++i) {
                            if (!firstBadLink.equals(nodes[i].getXferAddr())) continue;
                            this.errorIndex = i;
                            break block19;
                        }
                        break block19;
                    }
                    assert (!checkRestart);
                    this.errorIndex = 0;
                }
                if (checkRestart && this.shouldWaitForRestart(this.errorIndex)) {
                    this.restartDeadline = this.dfsClient.getConf().getDatanodeRestartTimeout() + Time.monotonicNow();
                    this.restartingNodeIndex.set(this.errorIndex);
                    this.errorIndex = -1;
                    LOG.info((Object)("Waiting for the datanode to be restarted: " + nodes[this.restartingNodeIndex.get()]));
                }
                this.hasError = true;
                this.lastException.set(ie);
                result = false;
                break;
            }
            finally {
                if (result) continue;
                IOUtils.closeSocket((Socket)this.s);
                this.s = null;
                IOUtils.closeStream(out);
                out = null;
                IOUtils.closeStream((Closeable)this.blockReplyStream);
                this.blockReplyStream = null;
                continue;
            }
            IOUtils.closeSocket((Socket)this.s);
            this.s = null;
            IOUtils.closeStream((Closeable)out);
            out = null;
            IOUtils.closeStream((Closeable)this.blockReplyStream);
            this.blockReplyStream = null;
            break;
        }
        return result;
    }

    private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
        if (this.favoredNodes == null) {
            return null;
        }
        boolean[] pinnings = new boolean[nodes.length];
        HashSet<String> favoredSet = new HashSet<String>(Arrays.asList(this.favoredNodes));
        for (int i = 0; i < nodes.length; ++i) {
            pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug((Object)(nodes[i].getXferAddrWithHostname() + " was chosen by name node (favored=" + pinnings[i] + ")."));
        }
        if (shouldLog && !favoredSet.isEmpty()) {
            LOG.warn((Object)("These favored nodes were specified but not chosen: " + favoredSet + " Specified favored nodes: " + Arrays.toString(this.favoredNodes)));
        }
        return pinnings;
    }

    protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException {
        DfsClientConf conf = this.dfsClient.getConf();
        int retries = conf.getNumBlockWriteLocateFollowingRetry();
        long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
        long localstart = Time.monotonicNow();
        while (true) {
            try {
                return this.dfsClient.namenode.addBlock(this.src, this.dfsClient.clientName, this.block, excludedNodes, this.stat.getFileId(), this.favoredNodes);
            }
            catch (RemoteException e) {
                IOException ue = e.unwrapRemoteException(new Class[]{FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, UnresolvedPathException.class});
                if (ue != e) {
                    throw ue;
                }
                if (NotReplicatedYetException.class.getName().equals(e.getClassName())) {
                    if (retries == 0) {
                        throw e;
                    }
                    --retries;
                    LOG.info((Object)"Exception while adding a block", (Throwable)e);
                    long elapsed = Time.monotonicNow() - localstart;
                    if (elapsed > 5000L) {
                        LOG.info((Object)("Waiting for replication for " + elapsed / 1000L + " seconds"));
                    }
                    try {
                        LOG.warn((Object)("NotReplicatedYetException sleeping " + this.src + " retries left " + retries));
                        Thread.sleep(sleeptime);
                        sleeptime *= 2L;
                    }
                    catch (InterruptedException ie) {
                        LOG.warn((Object)"Caught exception", (Throwable)ie);
                    }
                    continue;
                }
                throw e;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void backOffIfNecessary() throws InterruptedException {
        int t = 0;
        List<DatanodeInfo> list = this.congestedNodes;
        synchronized (list) {
            if (!this.congestedNodes.isEmpty()) {
                StringBuilder sb = new StringBuilder("DataNode");
                for (DatanodeInfo i : this.congestedNodes) {
                    sb.append(' ').append(i);
                }
                int range = Math.abs(this.lastCongestionBackoffTime * 3 - 5000);
                int base = Math.min(this.lastCongestionBackoffTime * 3, 5000);
                this.lastCongestionBackoffTime = t = Math.min(50000, (int)((double)base + Math.random() * (double)range));
                sb.append(" are congested. Backing off for ").append(t).append(" ms");
                LOG.info((Object)sb.toString());
                this.congestedNodes.clear();
            }
        }
        if (t != 0) {
            Thread.sleep(t);
        }
    }

    ExtendedBlock getBlock() {
        return this.block;
    }

    DatanodeInfo[] getNodes() {
        return this.nodes;
    }

    Token<BlockTokenIdentifier> getBlockToken() {
        return this.accessToken;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void queuePacket(DFSPacket packet) {
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            if (packet == null) {
                return;
            }
            packet.addTraceParent(Tracer.getCurrentSpanId());
            if (this.canStoreFileInDB() && packet.getLastByteOffsetBlock() <= (long)this.dbFileMaxSize) {
                LOG.debug((Object)"Stuffed Inode:  Temporarily withholding the packet in a buffer for small files");
                this.smallFileDataQueue.addLast(packet);
            } else {
                this.forwardSmallFilesPacketsToDataNodes();
                this.dataQueue.addLast(packet);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Queued packet " + packet.getSeqno()));
                }
            }
            this.lastQueuedSeqno = packet.getSeqno();
            this.dataQueue.notifyAll();
        }
    }

    private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
        byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
        return new DFSPacket(buf, 0, 0L, -1L, 0, false);
    }

    private LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes() {
        return CacheBuilder.newBuilder().expireAfterWrite(this.dfsClient.getConf().getExcludedNodesCacheExpiry(), TimeUnit.MILLISECONDS).removalListener((RemovalListener)new RemovalListener<DatanodeInfo, DatanodeInfo>(){

            public void onRemoval(RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
                LOG.info((Object)("Removing node " + notification.getKey() + " from the excluded nodes list"));
            }
        }).build((CacheLoader)new CacheLoader<DatanodeInfo, DatanodeInfo>(){

            public DatanodeInfo load(DatanodeInfo key) throws Exception {
                return key;
            }
        });
    }

    private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
        System.arraycopy(srcs, 0, dsts, 0, skipIndex);
        System.arraycopy(srcs, skipIndex + 1, dsts, skipIndex, dsts.length - skipIndex);
    }

    AtomicBoolean getPersistBlocks() {
        return this.persistBlocks;
    }

    void setAppendChunk(boolean appendChunk) {
        this.appendChunk = appendChunk;
    }

    boolean getAppendChunk() {
        return this.appendChunk;
    }

    LastExceptionInStreamer getLastException() {
        return this.lastException;
    }

    void setSocketToNull() {
        this.s = null;
    }

    long getAndIncCurrentSeqno() {
        long old = this.currentSeqno++;
        return old;
    }

    long getLastQueuedSeqno() {
        return this.lastQueuedSeqno;
    }

    long getBytesCurBlock() {
        return this.bytesCurBlock;
    }

    void setBytesCurBlock(long bytesCurBlock) {
        this.bytesCurBlock = bytesCurBlock;
    }

    void incBytesCurBlock(long len) {
        this.bytesCurBlock += len;
    }

    void setArtificialSlowdown(long period) {
        this.artificialSlowdown = period;
    }

    boolean streamerClosed() {
        return this.streamerClosed;
    }

    void closeSocket() throws IOException {
        if (this.s != null) {
            this.s.close();
        }
    }

    private DatanodeInfo[] setupPipelineForSingleBlock(LocatedBlock lb) throws IOException {
        DatanodeInfo[] nodes;
        boolean success;
        int count = this.dfsClient.getConf().getNumBlockWriteRetry();
        do {
            this.hasError = false;
            this.lastException.set(null);
            this.errorIndex = -1;
            this.block = lb.getBlock();
            this.block.setNumBytes(0L);
            this.bytesSent = 0L;
            this.accessToken = lb.getBlockToken();
            nodes = lb.getLocations();
            success = this.createBlockOutputStream(nodes, this.storageTypes, 0L, false);
            if (success) continue;
        } while (!success && --count >= 0);
        if (!success) {
            throw new IOException("Unable to initiate single block send.");
        }
        return nodes;
    }

    public void enableSourceStream(int stripeLength) {
        this.erasureCodingSourceStream = true;
        this.stripeLength = stripeLength;
    }

    public void enableParityStream(int stripeLength, int parityLength, String sourceFile) throws IOException {
        this.erasureCodingParityStream = true;
        this.stripeLength = stripeLength;
        this.parityLength = parityLength;
        if (sourceFile != null) {
            this.sourceBlocks = new ArrayList<LocatedBlock>(this.dfsClient.getLocatedBlocks(sourceFile, 0L, Long.MAX_VALUE).getLocatedBlocks());
            Collections.sort(this.sourceBlocks, LocatedBlock.blockIdComparator);
        }
    }

    public void setParityStripeNodesForNextStripe(Collection<DatanodeInfo> locations) {
        this.parityStripeNodes.clear();
        this.parityStripeNodes.addAll(locations);
    }

    public Collection<DatanodeInfo> getUsedNodes() {
        return this.usedNodes;
    }

    public boolean canStoreFileInDB() {
        return this.isThisFileStoredInDB && !this.syncOrFlushCalled;
    }

    public void forwardSmallFilesPacketsToDataNodes() {
        if (this.isThisFileStoredInDB) {
            LOG.debug((Object)"Stuffed Inode:  The file can not be stored  in the database");
            this.isThisFileStoredInDB = false;
            if (!this.smallFileDataQueue.isEmpty()) {
                for (DFSPacket packet : this.smallFileDataQueue) {
                    packet.addTraceParent(Tracer.getCurrentSpanId());
                    this.dataQueue.addLast(packet);
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug((Object)("Queued packet " + packet.getSeqno()));
                }
                this.smallFileDataQueue.clear();
            }
        }
    }

    public void syncOrFlushCalled() {
        this.syncOrFlushCalled = true;
    }

    public List<DFSPacket> getSmallFileDataQueue() {
        return this.smallFileDataQueue;
    }

    public void setFileStoredInDB(boolean isThisFileStoredInDB) {
        this.isThisFileStoredInDB = isThisFileStoredInDB;
    }

    private class ResponseProcessor
    extends Daemon {
        private volatile boolean responderClosed = false;
        private DatanodeInfo[] targets = null;
        private boolean isLastPacketInBlock = false;

        ResponseProcessor(DatanodeInfo[] targets) {
            this.targets = targets;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            this.setName("ResponseProcessor for block " + DataStreamer.this.block);
            PipelineAck ack = new PipelineAck();
            TraceScope scope = null;
            while (!this.responderClosed && ((DataStreamer)DataStreamer.this).dfsClient.clientRunning && !this.isLastPacketInBlock) {
                try {
                    DFSPacket one;
                    List i2;
                    long begin = Time.monotonicNow();
                    ack.readFields(DataStreamer.this.blockReplyStream);
                    long duration = Time.monotonicNow() - begin;
                    if (duration > DataStreamer.this.dfsclientSlowLogThresholdMs && ack.getSeqno() != -1L) {
                        LOG.warn((Object)("Slow ReadProcessor read fields took " + duration + "ms (threshold=" + DataStreamer.this.dfsclientSlowLogThresholdMs + "ms); ack: " + ack + ", targets: " + Arrays.asList(this.targets)));
                    } else if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("DFSClient " + ack));
                    }
                    long seqno = ack.getSeqno();
                    ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<DatanodeInfo>();
                    for (int i2 = ack.getNumOfReplies() - 1; i2 >= 0 && ((DataStreamer)DataStreamer.this).dfsClient.clientRunning; --i2) {
                        DataTransferProtos.Status reply = PipelineAck.getStatusFromHeader(ack.getHeaderFlag(i2));
                        if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i2)) == PipelineAck.ECN.CONGESTED) {
                            congestedNodesFromAck.add(this.targets[i2]);
                        }
                        if (PipelineAck.isRestartOOBStatus(reply) && DataStreamer.this.shouldWaitForRestart(i2)) {
                            DataStreamer.this.restartDeadline = DataStreamer.this.dfsClient.getConf().getDatanodeRestartTimeout() + Time.monotonicNow();
                            DataStreamer.this.setRestartingNodeIndex(i2);
                            String message = "A datanode is restarting: " + this.targets[i2];
                            LOG.info((Object)message);
                            throw new IOException(message);
                        }
                        if (reply == DataTransferProtos.Status.SUCCESS) continue;
                        DataStreamer.this.setErrorIndex(i2);
                        throw new IOException("Bad response " + (Object)((Object)reply) + " for block " + DataStreamer.this.block + " from datanode " + this.targets[i2]);
                    }
                    if (!congestedNodesFromAck.isEmpty()) {
                        i2 = DataStreamer.this.congestedNodes;
                        synchronized (i2) {
                            DataStreamer.this.congestedNodes.clear();
                            DataStreamer.this.congestedNodes.addAll(congestedNodesFromAck);
                        }
                    }
                    i2 = DataStreamer.this.congestedNodes;
                    synchronized (i2) {
                        DataStreamer.this.congestedNodes.clear();
                        DataStreamer.this.lastCongestionBackoffTime = 0;
                    }
                    assert (seqno != -2L) : "Ack for unknown seqno should be a failed ack: " + ack;
                    if (seqno == -1L) continue;
                    LinkedList linkedList = DataStreamer.this.dataQueue;
                    synchronized (linkedList) {
                        one = (DFSPacket)DataStreamer.this.ackQueue.getFirst();
                    }
                    if (one.getSeqno() != seqno) {
                        throw new IOException("ResponseProcessor: Expecting seqno  for block " + DataStreamer.this.block + one.getSeqno() + " but received " + seqno);
                    }
                    this.isLastPacketInBlock = one.isLastPacketInBlock();
                    if (DFSClientFaultInjector.get().failPacket() && this.isLastPacketInBlock) {
                        DataStreamer.this.failPacket = true;
                        throw new IOException("Failing the last packet for testing.");
                    }
                    DataStreamer.this.block.setNumBytes(one.getLastByteOffsetBlock());
                    linkedList = DataStreamer.this.dataQueue;
                    synchronized (linkedList) {
                        scope = one.getTraceScope();
                        if (scope != null) {
                            scope.reattach();
                            one.setTraceScope(null);
                        }
                        DataStreamer.this.lastAckedSeqno = seqno;
                        DataStreamer.this.ackQueue.removeFirst();
                        DataStreamer.this.dataQueue.notifyAll();
                        one.releaseBuffer(DataStreamer.this.byteArrayManager);
                    }
                }
                catch (Exception e) {
                    if (this.responderClosed) continue;
                    DataStreamer.this.lastException.set(e);
                    DataStreamer.this.hasError = true;
                    DataStreamer.this.tryMarkPrimaryDatanodeFailed();
                    LinkedList linkedList = DataStreamer.this.dataQueue;
                    synchronized (linkedList) {
                        DataStreamer.this.dataQueue.notifyAll();
                    }
                    if (DataStreamer.this.restartingNodeIndex.get() == -1) {
                        LOG.warn((Object)("Exception for " + DataStreamer.this.block), (Throwable)e);
                    }
                    this.responderClosed = true;
                }
                finally {
                    if (scope != null) {
                        scope.close();
                    }
                    scope = null;
                }
            }
        }

        void close() {
            this.responderClosed = true;
            this.interrupt();
        }
    }

    static class LastExceptionInStreamer {
        private IOException thrown;

        LastExceptionInStreamer() {
        }

        synchronized void set(Throwable t) {
            assert (t != null);
            this.thrown = t instanceof IOException ? (IOException)t : new IOException(t);
        }

        synchronized void clear() {
            this.thrown = null;
        }

        synchronized void check(boolean resetToNull) throws IOException {
            if (this.thrown != null) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)"Got Exception while checking", new Throwable(this.thrown));
                }
                IOException e = this.thrown;
                if (resetToNull) {
                    this.thrown = null;
                }
                throw e;
            }
        }

        synchronized void throwException4Close() throws IOException {
            this.check(false);
            throw new ClosedChannelException();
        }
    }
}

