package io.hops.erasure_coding;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.util.Progressable;

/* loaded from: input_file:io/hops/erasure_coding/ParallelStreamReader.class */
public class ParallelStreamReader {
    public static final Log LOG = LogFactory.getLog(ParallelStreamReader.class);
    Progressable reporter;
    InputStream[] streams;
    long[] endOffsets;
    ExecutorService readPool;
    Semaphore slots;
    int numThreads;
    long remainingBytesPerStream;
    int bufSize;
    long readTime = 0;
    volatile boolean running = true;
    BlockingQueue<ReadResult> boundedBuffer;
    Thread mainThread;

    /* loaded from: input_file:io/hops/erasure_coding/ParallelStreamReader$MainThread.class */
    class MainThread extends Thread {
        MainThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (ParallelStreamReader.this.running) {
                ReadResult readResult = new ReadResult(ParallelStreamReader.this.streams.length, ParallelStreamReader.this.bufSize);
                try {
                } catch (InterruptedException e) {
                    ParallelStreamReader.this.running = false;
                }
                if (ParallelStreamReader.this.remainingBytesPerStream == 0) {
                    return;
                }
                ParallelStreamReader.this.performReads(readResult);
                ParallelStreamReader.this.boundedBuffer.put(readResult);
                ParallelStreamReader.this.remainingBytesPerStream -= ParallelStreamReader.this.bufSize;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/hops/erasure_coding/ParallelStreamReader$ReadOperation.class */
    public class ReadOperation implements Runnable {
        ReadResult readResult;
        int idx;

        ReadOperation(ReadResult readResult, int i) {
            this.readResult = readResult;
            this.idx = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.readResult.setException(this.idx, null);
            try {
                try {
                    if (ParallelStreamReader.this.streams[this.idx] == null) {
                        Arrays.fill(this.readResult.readBufs[this.idx], (byte) 0);
                        ParallelStreamReader.this.slots.release();
                    } else {
                        this.readResult.numRead[this.idx] = RaidUtils.readTillEnd(ParallelStreamReader.this.streams[this.idx], this.readResult.readBufs[this.idx], true, ParallelStreamReader.this.endOffsets[this.idx], (int) Math.min(ParallelStreamReader.this.remainingBytesPerStream, r0.length));
                        ParallelStreamReader.this.slots.release();
                    }
                } catch (Exception e) {
                    ParallelStreamReader.LOG.warn("Encountered exception in stream " + this.idx, e);
                    this.readResult.setException(this.idx, e);
                    try {
                        ParallelStreamReader.this.streams[this.idx].close();
                    } catch (IOException e2) {
                    }
                    ParallelStreamReader.this.streams[this.idx] = null;
                    ParallelStreamReader.this.slots.release();
                }
            } catch (Throwable th) {
                ParallelStreamReader.this.slots.release();
                throw th;
            }
        }
    }

    /* loaded from: input_file:io/hops/erasure_coding/ParallelStreamReader$ReadResult.class */
    public static class ReadResult {
        public byte[][] readBufs;
        public int[] numRead;
        public IOException[] ioExceptions;

        /* JADX WARN: Type inference failed for: r1v1, types: [byte[], byte[][]] */
        ReadResult(int i, int i2) {
            this.readBufs = new byte[i];
            this.numRead = new int[i];
            for (int i3 = 0; i3 < this.readBufs.length; i3++) {
                this.readBufs[i3] = new byte[i2];
                this.numRead[i3] = 0;
            }
            this.ioExceptions = new IOException[this.readBufs.length];
        }

        void setException(int i, Exception exc) {
            synchronized (this.ioExceptions) {
                if (exc == null) {
                    this.ioExceptions[i] = null;
                    return;
                }
                if (exc instanceof IOException) {
                    this.ioExceptions[i] = (IOException) exc;
                } else {
                    this.ioExceptions[i] = new IOException(exc);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public IOException getException() {
            synchronized (this.ioExceptions) {
                for (int i = 0; i < this.ioExceptions.length; i++) {
                    if (this.ioExceptions[i] != null) {
                        return this.ioExceptions[i];
                    }
                }
                return null;
            }
        }
    }

    public ParallelStreamReader(Progressable progressable, InputStream[] inputStreamArr, int i, int i2, int i3, long j) throws IOException {
        this.reporter = progressable;
        this.streams = new InputStream[inputStreamArr.length];
        this.endOffsets = new long[inputStreamArr.length];
        for (int i4 = 0; i4 < inputStreamArr.length; i4++) {
            this.streams[i4] = inputStreamArr[i4];
            if (this.streams[i4] instanceof DFSClient.DFSDataInputStream) {
                DFSClient.DFSDataInputStream dFSDataInputStream = this.streams[i4];
                List allBlocks = dFSDataInputStream.getAllBlocks();
                if (allBlocks.size() == 0) {
                    this.endOffsets[i4] = Long.MAX_VALUE;
                } else {
                    this.endOffsets[i4] = dFSDataInputStream.getPos() + ((LocatedBlock) allBlocks.get(0)).getBlockSize();
                }
            } else {
                this.endOffsets[i4] = Long.MAX_VALUE;
            }
            inputStreamArr[i4] = null;
        }
        this.bufSize = i;
        this.boundedBuffer = new ArrayBlockingQueue(i3);
        if (i2 > inputStreamArr.length) {
            this.numThreads = inputStreamArr.length;
        } else {
            this.numThreads = i2;
        }
        this.remainingBytesPerStream = j;
        this.slots = new Semaphore(this.numThreads);
        this.readPool = Executors.newFixedThreadPool(this.numThreads);
        this.mainThread = new MainThread();
    }

    public void start() {
        this.mainThread.start();
    }

    public void shutdown() {
        LOG.info("Shutting down parallel stream reader");
        this.running = false;
        try {
            this.readPool.shutdownNow();
        } catch (Exception e) {
        }
        try {
            this.mainThread.interrupt();
        } catch (Exception e2) {
        }
        for (int i = 0; i < this.streams.length; i++) {
            if (this.streams[i] != null) {
                try {
                    this.streams[i].close();
                    this.streams[i] = null;
                } catch (IOException e3) {
                }
            }
        }
    }

    public ReadResult getReadResult() throws InterruptedException {
        return this.boundedBuffer.take();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void performReads(ReadResult readResult) throws InterruptedException {
        boolean tryAcquire;
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        while (i < this.streams.length) {
            boolean tryAcquire2 = this.slots.tryAcquire(1, 10L, TimeUnit.SECONDS);
            this.reporter.progress();
            if (tryAcquire2) {
                this.readPool.execute(new ReadOperation(readResult, i));
                i++;
            }
        }
        do {
            tryAcquire = this.slots.tryAcquire(this.numThreads, 10L, TimeUnit.SECONDS);
            this.reporter.progress();
        } while (!tryAcquire);
        this.slots.release(this.numThreads);
        this.readTime += System.currentTimeMillis() - currentTimeMillis;
    }
}
