/*
 * Decompiled with CFR 0.152.
 */
package io.hops.erasure_coding;

import io.hops.erasure_coding.RaidUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.util.Progressable;

public class ParallelStreamReader {
    public static final Log LOG = LogFactory.getLog(ParallelStreamReader.class);
    Progressable reporter;
    InputStream[] streams;
    long[] endOffsets;
    ExecutorService readPool;
    Semaphore slots;
    int numThreads;
    long remainingBytesPerStream;
    int bufSize;
    long readTime = 0L;
    volatile boolean running = true;
    BlockingQueue<ReadResult> boundedBuffer;
    Thread mainThread;

    public ParallelStreamReader(Progressable reporter, InputStream[] streams, int bufSize, int numThreads, int boundedBufferCapacity, long maxBytesPerStream) throws IOException {
        this.reporter = reporter;
        this.streams = new InputStream[streams.length];
        this.endOffsets = new long[streams.length];
        for (int i = 0; i < streams.length; ++i) {
            DFSClient.DFSDataInputStream stream;
            List blocks;
            this.streams[i] = streams[i];
            this.endOffsets[i] = this.streams[i] instanceof DFSClient.DFSDataInputStream ? ((blocks = (stream = (DFSClient.DFSDataInputStream)this.streams[i]).getAllBlocks()).size() == 0 ? Long.MAX_VALUE : stream.getPos() + ((LocatedBlock)blocks.get(0)).getBlockSize()) : Long.MAX_VALUE;
            streams[i] = null;
        }
        this.bufSize = bufSize;
        this.boundedBuffer = new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity);
        this.numThreads = numThreads > streams.length ? streams.length : numThreads;
        this.remainingBytesPerStream = maxBytesPerStream;
        this.slots = new Semaphore(this.numThreads);
        this.readPool = Executors.newFixedThreadPool(this.numThreads);
        this.mainThread = new MainThread();
    }

    public void start() {
        this.mainThread.start();
    }

    public void shutdown() {
        LOG.info((Object)"Shutting down parallel stream reader");
        this.running = false;
        try {
            this.readPool.shutdownNow();
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            this.mainThread.interrupt();
        }
        catch (Exception exception) {
            // empty catch block
        }
        for (int i = 0; i < this.streams.length; ++i) {
            if (this.streams[i] == null) continue;
            try {
                this.streams[i].close();
                this.streams[i] = null;
                continue;
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    public ReadResult getReadResult() throws InterruptedException {
        return this.boundedBuffer.take();
    }

    private void performReads(ReadResult readResult) throws InterruptedException {
        boolean acquired;
        long start = System.currentTimeMillis();
        int i = 0;
        while (i < this.streams.length) {
            boolean acquired2 = this.slots.tryAcquire(1, 10L, TimeUnit.SECONDS);
            this.reporter.progress();
            if (!acquired2) continue;
            this.readPool.execute(new ReadOperation(readResult, i));
            ++i;
        }
        do {
            acquired = this.slots.tryAcquire(this.numThreads, 10L, TimeUnit.SECONDS);
            this.reporter.progress();
        } while (!acquired);
        this.slots.release(this.numThreads);
        this.readTime += System.currentTimeMillis() - start;
    }

    class ReadOperation
    implements Runnable {
        ReadResult readResult;
        int idx;

        ReadOperation(ReadResult readResult, int idx) {
            this.readResult = readResult;
            this.idx = idx;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            this.readResult.setException(this.idx, null);
            try {
                int numRead;
                if (ParallelStreamReader.this.streams[this.idx] == null) {
                    Arrays.fill(this.readResult.readBufs[this.idx], (byte)0);
                    return;
                }
                boolean eofOK = true;
                byte[] buffer = this.readResult.readBufs[this.idx];
                this.readResult.numRead[this.idx] = numRead = RaidUtils.readTillEnd(ParallelStreamReader.this.streams[this.idx], buffer, eofOK, ParallelStreamReader.this.endOffsets[this.idx], (int)Math.min(ParallelStreamReader.this.remainingBytesPerStream, (long)buffer.length));
            }
            catch (Exception e) {
                LOG.warn((Object)("Encountered exception in stream " + this.idx), (Throwable)e);
                this.readResult.setException(this.idx, e);
                try {
                    ParallelStreamReader.this.streams[this.idx].close();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                ParallelStreamReader.this.streams[this.idx] = null;
            }
            finally {
                ParallelStreamReader.this.slots.release();
            }
        }
    }

    class MainThread
    extends Thread {
        MainThread() {
        }

        @Override
        public void run() {
            while (ParallelStreamReader.this.running) {
                ReadResult readResult = new ReadResult(ParallelStreamReader.this.streams.length, ParallelStreamReader.this.bufSize);
                try {
                    if (ParallelStreamReader.this.remainingBytesPerStream == 0L) {
                        return;
                    }
                    ParallelStreamReader.this.performReads(readResult);
                    ParallelStreamReader.this.boundedBuffer.put(readResult);
                    ParallelStreamReader.this.remainingBytesPerStream -= (long)ParallelStreamReader.this.bufSize;
                }
                catch (InterruptedException e) {
                    ParallelStreamReader.this.running = false;
                }
            }
        }
    }

    public static class ReadResult {
        public byte[][] readBufs;
        public int[] numRead;
        public IOException[] ioExceptions;

        ReadResult(int numStreams, int bufSize) {
            this.readBufs = new byte[numStreams][];
            this.numRead = new int[numStreams];
            for (int i = 0; i < this.readBufs.length; ++i) {
                this.readBufs[i] = new byte[bufSize];
                this.numRead[i] = 0;
            }
            this.ioExceptions = new IOException[this.readBufs.length];
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void setException(int idx, Exception e) {
            IOException[] iOExceptionArray = this.ioExceptions;
            synchronized (this.ioExceptions) {
                if (e == null) {
                    this.ioExceptions[idx] = null;
                    // ** MonitorExit[var3_3] (shouldn't be in output)
                    return;
                }
                this.ioExceptions[idx] = e instanceof IOException ? (IOException)e : new IOException(e);
                // ** MonitorExit[var3_3] (shouldn't be in output)
                return;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        IOException getException() {
            IOException[] iOExceptionArray = this.ioExceptions;
            synchronized (this.ioExceptions) {
                for (int i = 0; i < this.ioExceptions.length; ++i) {
                    if (this.ioExceptions[i] == null) continue;
                    // ** MonitorExit[var1_1] (shouldn't be in output)
                    return this.ioExceptions[i];
                }
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return null;
            }
        }
    }
}

