/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.com.metamx.http.client.response;

import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Enumeration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.io.ByteSource;
import org.apache.hive.druid.com.metamx.common.logger.Logger;
import org.apache.hive.druid.com.metamx.http.client.response.ClientResponse;
import org.apache.hive.druid.com.metamx.http.client.response.HttpResponseHandler;
import org.apache.hive.druid.org.jboss.netty.buffer.ChannelBuffer;
import org.apache.hive.druid.org.jboss.netty.buffer.ChannelBufferInputStream;
import org.apache.hive.druid.org.jboss.netty.handler.codec.http.HttpChunk;
import org.apache.hive.druid.org.jboss.netty.handler.codec.http.HttpResponse;

public class SequenceInputStreamResponseHandler
implements HttpResponseHandler<InputStream, InputStream> {
    private static final Logger log = new Logger(SequenceInputStreamResponseHandler.class);
    private final AtomicLong byteCount = new AtomicLong(0L);
    private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<InputStream>();
    private final AtomicBoolean done = new AtomicBoolean(false);

    @Override
    public ClientResponse<InputStream> handleResponse(HttpResponse response) {
        try {
            this.queue.put(new ChannelBufferInputStream(response.getContent()));
        }
        catch (InterruptedException e) {
            log.error(e, "Queue appending interrupted", new Object[0]);
            Thread.currentThread().interrupt();
            throw Throwables.propagate(e);
        }
        this.byteCount.addAndGet(response.getContent().readableBytes());
        return ClientResponse.finished(new SequenceInputStream((Enumeration<? extends InputStream>)new Enumeration<InputStream>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean hasMoreElements() {
                AtomicBoolean atomicBoolean = SequenceInputStreamResponseHandler.this.done;
                synchronized (atomicBoolean) {
                    return !SequenceInputStreamResponseHandler.this.done.get() || !SequenceInputStreamResponseHandler.this.queue.isEmpty();
                }
            }

            @Override
            public InputStream nextElement() {
                try {
                    return (InputStream)SequenceInputStreamResponseHandler.this.queue.take();
                }
                catch (InterruptedException e) {
                    log.warn(e, "Thread interrupted while taking from queue", new Object[0]);
                    Thread.currentThread().interrupt();
                    throw Throwables.propagate(e);
                }
            }
        }));
    }

    @Override
    public ClientResponse<InputStream> handleChunk(ClientResponse<InputStream> clientResponse, HttpChunk chunk) {
        ChannelBuffer channelBuffer = chunk.getContent();
        int bytes = channelBuffer.readableBytes();
        if (bytes > 0) {
            try {
                this.queue.put(new ChannelBufferInputStream(channelBuffer));
                log.debug("Added stream. Queue length %d", this.queue.size());
            }
            catch (InterruptedException e) {
                log.warn(e, "Thread interrupted while adding to queue", new Object[0]);
                Thread.currentThread().interrupt();
                throw Throwables.propagate(e);
            }
            this.byteCount.addAndGet(bytes);
        } else {
            log.debug("Skipping zero length chunk", new Object[0]);
        }
        return clientResponse;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse) {
        AtomicBoolean atomicBoolean = this.done;
        synchronized (atomicBoolean) {
            try {
                this.queue.put(ByteSource.empty().openStream());
                log.debug("Added terminal empty stream", new Object[0]);
            }
            catch (InterruptedException e) {
                try {
                    log.warn(e, "Thread interrupted while adding to queue", new Object[0]);
                    Thread.currentThread().interrupt();
                    throw Throwables.propagate(e);
                    catch (IOException e2) {
                        log.wtf(e2, "The empty stream threw an IOException", new Object[0]);
                        throw Throwables.propagate(e2);
                    }
                }
                catch (Throwable throwable) {
                    log.debug("Done after adding %d bytes of streams", this.byteCount.get());
                    this.done.set(true);
                    throw throwable;
                }
            }
            log.debug("Done after adding %d bytes of streams", this.byteCount.get());
            this.done.set(true);
            return ClientResponse.finished(clientResponse.getObj());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void exceptionCaught(ClientResponse<InputStream> clientResponse, final Throwable e) {
        AtomicBoolean atomicBoolean = this.done;
        synchronized (atomicBoolean) {
            this.done.set(true);
            boolean accepted = this.queue.offer(new InputStream(){

                @Override
                public int read() throws IOException {
                    throw new IOException(e);
                }
            });
            if (!accepted) {
                log.warn("Unable to place final IOException offer in queue", new Object[0]);
            } else {
                log.debug("Placed IOException in queue", new Object[0]);
            }
            log.debug(e, "Exception with queue length of %d and %d bytes available", this.queue.size(), this.byteCount.get());
        }
    }

    public final long getByteCount() {
        return this.byteCount.get();
    }
}

