/*
 * Decompiled with CFR 0.152.
 */
package io.hops.erasure_coding;

import io.hops.erasure_coding.Codec;
import io.hops.erasure_coding.ErasureCode;
import io.hops.erasure_coding.ParallelStreamReader;
import io.hops.erasure_coding.RaidUtils;
import io.hops.erasure_coding.StripeReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ErasureCodingFileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.util.Progressable;

public class Encoder {
    public static final Log LOG = LogFactory.getLog((String)"org.apache.hadoop.raid.Encoder");
    public static final int DEFAULT_PARALLELISM = 4;
    protected Configuration conf;
    protected int parallelism;
    protected Codec codec;
    protected ErasureCode code;
    protected Random rand;
    protected int bufSize;
    protected byte[][] writeBufs;

    Encoder(Configuration conf, Codec codec) {
        this.conf = conf;
        this.parallelism = conf.getInt("raid.encoder.parallelism", 4);
        this.codec = codec;
        this.code = codec.createErasureCode(conf);
        this.rand = new Random();
        this.bufSize = conf.getInt("raid.encoder.bufsize", 0x100000);
        this.writeBufs = new byte[codec.parityLength][];
        this.allocateBuffers();
    }

    private void allocateBuffers() {
        for (int i = 0; i < this.codec.parityLength; ++i) {
            this.writeBufs[i] = new byte[this.bufSize];
        }
    }

    private void configureBuffers(long blockSize) {
        if ((long)this.bufSize > blockSize) {
            this.bufSize = (int)blockSize;
            this.allocateBuffers();
        } else if (blockSize % (long)this.bufSize != 0L) {
            this.bufSize = (int)(blockSize / 256L);
            if (this.bufSize == 0) {
                this.bufSize = 1024;
            }
            this.bufSize = Math.min(this.bufSize, 0x100000);
            this.allocateBuffers();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void encodeFile(Configuration jobConf, FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile, short parityRepl, long numStripes, long blockSize, Progressable reporter, StripeReader sReader, Path copyPath, FSDataOutputStream copy) throws IOException {
        long expectedParityBlocks = numStripes * (long)this.codec.parityLength;
        long expectedParityFileSize = numStripes * blockSize * (long)this.codec.parityLength;
        if (!parityFs.mkdirs(parityFile.getParent())) {
            throw new IOException("Could not create parent dir " + parityFile.getParent());
        }
        if (parityFs.exists(parityFile)) {
            parityFs.delete(parityFile, false);
        }
        short tmpRepl = parityRepl;
        if (expectedParityBlocks >= (long)this.conf.getInt("raid.encoder.largeparity.blocks", 20) && parityRepl == 1) {
            tmpRepl = 2;
        }
        FSDataOutputStream out = parityFs.create(parityFile, true, this.conf.getInt("io.file.buffer.size", 65536), tmpRepl, blockSize);
        if (parityFs instanceof DistributedFileSystem) {
            BlockStoragePolicy policy = ((DistributedFileSystem)parityFs).getStoragePolicy(srcFile);
            ((DistributedFileSystem)parityFs).setStoragePolicy(parityFile, policy.getName());
        }
        DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream();
        dfsOut.enableParityStream(this.codec.getStripeLength(), this.codec.getParityLength(), copy == null ? srcFile.toUri().getPath() : null);
        try {
            this.encodeFileToStream(fs, srcFile, parityFile, sReader, blockSize, out, reporter, copyPath, copy);
            out.close();
            out = null;
            LOG.info((Object)("Wrote parity file " + parityFile));
            FileStatus tmpStat = parityFs.getFileStatus(parityFile);
            if (tmpStat.getLen() != expectedParityFileSize) {
                throw new IOException("Expected parity size " + expectedParityFileSize + " does not match actual " + tmpStat.getLen());
            }
            if (tmpRepl > parityRepl) {
                parityFs.setReplication(parityFile, parityRepl);
            }
            LOG.info((Object)("Wrote parity file " + parityFile));
        }
        finally {
            if (out != null) {
                out.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recoverParityBlockToFile(FileSystem fs, FileStatus srcStat, long blockSize, Path parityFile, long corruptOffset, File localBlockFile, Progressable progress) throws IOException {
        try (FileOutputStream out = new FileOutputStream(localBlockFile);){
            this.recoverParityBlockToStream(fs, srcStat, blockSize, parityFile, corruptOffset, out, progress);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recoverParityBlockToStream(FileSystem fs, FileStatus srcStat, long blockSize, Path parityFile, long corruptOffset, OutputStream out, Progressable progress) throws IOException {
        LOG.info((Object)("Recovering parity block" + parityFile + ":" + corruptOffset));
        Path srcFile = srcStat.getPath();
        corruptOffset = corruptOffset / blockSize * blockSize;
        OutputStream[] outs = new OutputStream[this.codec.parityLength];
        long indexOfCorruptBlockInParityStripe = corruptOffset / blockSize % (long)this.codec.parityLength;
        LOG.info((Object)("Index of corrupt block in parity stripe: " + indexOfCorruptBlockInParityStripe));
        for (int i = 0; i < this.codec.parityLength; ++i) {
            outs[i] = indexOfCorruptBlockInParityStripe == (long)i ? out : new NullOutputStream();
        }
        long stripeIdx = corruptOffset / ((long)this.codec.parityLength * blockSize);
        StripeReader sReader = StripeReader.getStripeReader(this.codec, this.conf, blockSize, fs, stripeIdx, srcStat);
        assert (sReader.hasNext());
        InputStream[] blocks = sReader.getNextStripeInputs();
        LOG.info((Object)("Starting recovery by using source stripe " + srcFile + ": stripe " + stripeIdx));
        try {
            this.encodeStripe(fs, srcFile, parityFile, blocks, blockSize, outs, progress);
        }
        finally {
            RaidUtils.closeStreams(blocks);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void encodeFileToStream(FileSystem fs, Path sourceFile, Path parityFile, StripeReader sReader, long blockSize, FSDataOutputStream out, Progressable reporter, Path copyPath, FSDataOutputStream copy) throws IOException {
        int i;
        OutputStream[] tmpOuts = new OutputStream[this.codec.parityLength];
        tmpOuts[0] = out;
        File[] tmpFiles = new File[this.codec.parityLength - 1];
        for (int i2 = 0; i2 < this.codec.parityLength - 1; ++i2) {
            tmpFiles[i2] = File.createTempFile("parity", "_" + i2);
            LOG.info((Object)("Created tmp file " + tmpFiles[i2]));
            tmpFiles[i2].deleteOnExit();
        }
        OutputStream[] copyOuts = null;
        File[] tmpCopyFiles = null;
        if (copy != null) {
            tmpCopyFiles = new File[this.codec.stripeLength];
            copyOuts = new OutputStream[this.codec.stripeLength];
            for (i = 0; i < this.codec.stripeLength; ++i) {
                tmpCopyFiles[i] = File.createTempFile("copy", "_" + i);
                LOG.info((Object)("Created copy file " + tmpCopyFiles[i]));
                tmpCopyFiles[i].deleteOnExit();
            }
        }
        try {
            int stripe = 0;
            while (sReader.hasNext()) {
                int i3;
                reporter.progress();
                InputStream[] blocks = sReader.getNextStripeInputs();
                try {
                    for (i3 = 0; i3 < this.codec.parityLength - 1; ++i3) {
                        tmpOuts[i3 + 1] = new FileOutputStream(tmpFiles[i3]);
                    }
                    if (copy != null) {
                        for (i3 = 0; i3 < this.codec.stripeLength; ++i3) {
                            copyOuts[i3] = new FileOutputStream(tmpCopyFiles[i3]);
                        }
                    }
                    this.encodeStripe(fs, sourceFile, parityFile, blocks, blockSize, tmpOuts, reporter, true, stripe, copyPath, copyOuts);
                    ++stripe;
                }
                finally {
                    RaidUtils.closeStreams(blocks);
                }
                for (i3 = 0; i3 < this.codec.parityLength - 1; ++i3) {
                    tmpOuts[i3 + 1].close();
                    tmpOuts[i3 + 1] = null;
                    FileInputStream in = new FileInputStream(tmpFiles[i3]);
                    RaidUtils.copyBytes(in, (OutputStream)out, this.writeBufs[i3], blockSize);
                    reporter.progress();
                }
                if (copy == null) continue;
                out.hflush();
                DFSOutputStream sourceOut = (DFSOutputStream)copy.getWrappedStream();
                DFSOutputStream parityOut = (DFSOutputStream)out.getWrappedStream();
                sourceOut.setParityStripeNodesForNextStripe(parityOut.getUsedNodes());
                for (int i4 = 0; i4 < this.codec.stripeLength; ++i4) {
                    copyOuts[i4].close();
                    copyOuts[i4] = null;
                    FileInputStream in = new FileInputStream(tmpCopyFiles[i4]);
                    RaidUtils.copyBytes(in, (OutputStream)copy, this.writeBufs[0], blockSize);
                    reporter.progress();
                }
                copy.hflush();
            }
        }
        finally {
            for (i = 0; i < this.codec.parityLength - 1; ++i) {
                if (tmpOuts[i + 1] != null) {
                    tmpOuts[i + 1].close();
                }
                tmpFiles[i].delete();
                LOG.info((Object)("Deleted tmp file " + tmpFiles[i]));
            }
            if (copy != null) {
                for (i = 0; i < this.codec.stripeLength; ++i) {
                    if (copyOuts[i] != null) {
                        copyOuts[i].close();
                    }
                    tmpCopyFiles[i].delete();
                    LOG.info((Object)("Deleted tmp file " + tmpCopyFiles[i]));
                }
            }
        }
    }

    void encodeStripe(FileSystem fs, Path sourceFile, Path parityFile, InputStream[] blocks, long blockSize, OutputStream[] outs, Progressable reporter) throws IOException {
        this.encodeStripe(fs, sourceFile, parityFile, blocks, blockSize, outs, reporter, false, 0, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void encodeStripe(FileSystem fs, Path sourceFile, Path parityFile, InputStream[] blocks, long blockSize, OutputStream[] outs, Progressable reporter, boolean computeBlockChecksum, int stripe, Path copyPath, OutputStream[] copyOuts) throws IOException {
        this.configureBuffers(blockSize);
        int boundedBufferCapacity = 1;
        ParallelStreamReader parallelReader = new ParallelStreamReader(reporter, blocks, this.bufSize, this.parallelism, boundedBufferCapacity, blockSize);
        parallelReader.start();
        Checksum[] sourceChecksums = null;
        Checksum[] parityChecksums = null;
        if (computeBlockChecksum) {
            int i;
            sourceChecksums = new Checksum[this.codec.stripeLength];
            for (i = 0; i < sourceChecksums.length; ++i) {
                sourceChecksums[i] = new CRC32();
            }
            parityChecksums = new Checksum[this.codec.parityLength];
            for (i = 0; i < parityChecksums.length; ++i) {
                parityChecksums[i] = new CRC32();
            }
        }
        try {
            for (long encoded = 0L; encoded < blockSize; encoded += (long)this.bufSize) {
                int i;
                ParallelStreamReader.ReadResult readResult = null;
                try {
                    readResult = parallelReader.getReadResult();
                }
                catch (InterruptedException e) {
                    throw new IOException("Interrupted while waiting for read result");
                }
                IOException readEx = readResult.getException();
                if (readEx != null) {
                    throw readEx;
                }
                if (computeBlockChecksum) {
                    this.updateChecksums(sourceChecksums, readResult.readBufs);
                }
                if (copyOuts != null) {
                    for (i = 0; i < readResult.readBufs.length; ++i) {
                        copyOuts[i].write(readResult.readBufs[i], 0, readResult.numRead[i]);
                    }
                }
                this.code.encodeBulk(readResult.readBufs, this.writeBufs);
                reporter.progress();
                for (i = 0; i < this.codec.parityLength; ++i) {
                    outs[i].write(this.writeBufs[i], 0, this.bufSize);
                    if (computeBlockChecksum) {
                        parityChecksums[i].update(this.writeBufs[i], 0, this.bufSize);
                    }
                    reporter.progress();
                }
            }
            DistributedFileSystem dfs = (DistributedFileSystem)(fs instanceof ErasureCodingFileSystem ? ((ErasureCodingFileSystem)fs).getFileSystem() : fs);
            this.sendChecksums(dfs, copyPath == null ? sourceFile : copyPath, sourceChecksums, stripe, this.codec.stripeLength);
            this.sendChecksums(dfs, parityFile, parityChecksums, stripe, this.codec.parityLength);
        }
        finally {
            parallelReader.shutdown();
        }
    }

    private void updateChecksums(Checksum[] checksums, byte[][] buffs) {
        for (int i = 0; i < checksums.length; ++i) {
            checksums[i].update(buffs[i], 0, buffs[0].length);
        }
    }

    private void sendChecksums(DistributedFileSystem dfs, Path file, Checksum[] checksums, int stripe, int length) throws IOException {
        if (checksums == null) {
            return;
        }
        DFSClient dfsClient = dfs.getClient();
        int firstBlockIndex = stripe * length;
        for (int i = 0; i < length; ++i) {
            int blockIndex = firstBlockIndex + i;
            dfsClient.addBlockChecksum(file.toUri().getPath(), blockIndex, checksums[i].getValue());
        }
    }

    static class NullOutputStream
    extends OutputStream {
        NullOutputStream() {
        }

        @Override
        public void write(byte[] b) throws IOException {
        }

        @Override
        public void write(int b) throws IOException {
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
        }
    }
}

