/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import io.hops.erasure_coding.Decoder;
import io.hops.erasure_coding.Helper;
import io.hops.erasure_coding.RaidUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.mapreduce.Mapper;

public class BlockReconstructor
extends Configured {
    public static final Log LOG = LogFactory.getLog(BlockReconstructor.class);
    public static final String SEND_BLOCK_RETRY_COUNT_KEY = "io.hops.erasure_coding.send_block_retry_count";
    public static final int DEFAULT_SEND_BLOCK_RETRY_COUNT = 3;
    private final int retryClount;

    public BlockReconstructor(Configuration conf) {
        super(conf);
        this.retryClount = conf.getInt(SEND_BLOCK_RETRY_COUNT_KEY, 3);
    }

    public boolean processFile(Path sourceFile, Path parityFile, Decoder decoder) throws IOException, InterruptedException {
        DFSClient dfsClient = Helper.getDFS(this.getConf(), sourceFile).getClient();
        LocatedBlocks missingBlocks = dfsClient.getMissingLocatedBlocks(sourceFile.toUri().getPath());
        return this.processFile(sourceFile, parityFile, missingBlocks, decoder, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean processFile(Path sourceFile, Path parityFile, LocatedBlocks missingBlocks, Decoder decoder, Mapper.Context context) throws IOException, InterruptedException {
        LOG.info((Object)("Processing file " + sourceFile.toString()));
        Mapper.Context progress = context;
        if (progress == null) {
            progress = RaidUtils.NULL_PROGRESSABLE;
        }
        DistributedFileSystem srcFs = Helper.getDFS(this.getConf(), sourceFile);
        FileStatus sourceStatus = srcFs.getFileStatus(sourceFile);
        DistributedFileSystem parityFs = Helper.getDFS(this.getConf(), parityFile);
        long blockSize = sourceStatus.getBlockSize();
        long srcFileSize = sourceStatus.getLen();
        String uriPath = sourceFile.toUri().getPath();
        int numBlocksReconstructed = 0;
        for (LocatedBlock lb : missingBlocks.getLocatedBlocks()) {
            Block lostBlock = lb.getBlock().getLocalBlock();
            long lostBlockOffset = lb.getStartOffset();
            LOG.info((Object)("Found lost block " + lostBlock + ", offset " + lostBlockOffset));
            long blockContentsSize = Math.min(blockSize, srcFileSize - lostBlockOffset);
            File localBlockFile = File.createTempFile(lostBlock.getBlockName(), ".tmp");
            localBlockFile.deleteOnExit();
            try {
                decoder.recoverBlockToFile((FileSystem)srcFs, sourceFile, (FileSystem)parityFs, parityFile, blockSize, lostBlockOffset, localBlockFile, blockContentsSize, context);
                this.sendRepairedBlock(srcFs, sourceFile, parityFile, false, lb, localBlockFile);
                ++numBlocksReconstructed;
            }
            finally {
                localBlockFile.delete();
            }
            progress.progress();
        }
        LOG.info((Object)("Reconstructed " + numBlocksReconstructed + " blocks in " + sourceFile));
        return true;
    }

    public boolean processParityFile(Path sourceFile, Path parityFile, Decoder decoder) throws IOException, InterruptedException {
        return this.processParityFile(sourceFile, parityFile, decoder, null);
    }

    public boolean processParityFile(Path sourceFile, Path parityFile, Decoder decoder, Mapper.Context context) throws IOException, InterruptedException {
        DFSClient dfsClient = Helper.getDFS(this.getConf(), sourceFile).getClient();
        LocatedBlocks missingBlocks = dfsClient.getMissingLocatedBlocks(parityFile.toUri().getPath());
        return this.processParityFile(sourceFile, parityFile, missingBlocks, decoder, context);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean processParityFile(Path sourceFile, Path parityFile, LocatedBlocks missingBlocks, Decoder decoder, Mapper.Context context) throws IOException, InterruptedException {
        LOG.info((Object)("Processing parity file for " + sourceFile.toString()));
        Mapper.Context progress = context;
        if (progress == null) {
            progress = RaidUtils.NULL_PROGRESSABLE;
        }
        DistributedFileSystem srcFs = Helper.getDFS(this.getConf(), sourceFile);
        Path srcPath = sourceFile;
        DistributedFileSystem parityFs = Helper.getDFS(this.getConf(), parityFile);
        Path parityPath = parityFile;
        FileStatus parityStat = parityFs.getFileStatus(parityPath);
        long blockSize = parityStat.getBlockSize();
        FileStatus srcStat = srcFs.getFileStatus(srcPath);
        int numBlocksReconstructed = 0;
        for (LocatedBlock lb : missingBlocks.getLocatedBlocks()) {
            Block lostBlock = lb.getBlock().getLocalBlock();
            long lostBlockOffset = lb.getStartOffset();
            LOG.info((Object)("Found lost block " + lostBlock + ", offset " + lostBlockOffset));
            File localBlockFile = File.createTempFile(lostBlock.getBlockName(), ".tmp");
            localBlockFile.deleteOnExit();
            try {
                decoder.recoverParityBlockToFile((FileSystem)srcFs, srcPath, (FileSystem)parityFs, parityPath, blockSize, lostBlockOffset, localBlockFile, context);
                this.sendRepairedBlock(parityFs, sourceFile, parityFile, true, lb, localBlockFile);
                ++numBlocksReconstructed;
            }
            finally {
                localBlockFile.delete();
            }
            progress.progress();
        }
        LOG.info((Object)("Reconstructed " + numBlocksReconstructed + " blocks in " + parityPath));
        return true;
    }

    private void sendRepairedBlock(DistributedFileSystem dfs, Path sourceFile, Path parityFile, boolean isParityBlock, LocatedBlock lb, File block) throws IOException {
        LocatedBlock blockReceivers = dfs.getClient().getRepairedBlockLocations(sourceFile.toUri().getPath(), parityFile.toUri().getPath(), lb, isParityBlock);
        Path destination = isParityBlock ? parityFile : sourceFile;
        int retries = this.retryClount + 1;
        while (true) {
            try {
                int read;
                HdfsDataOutputStream out = dfs.sendBlock(destination, blockReceivers, null, null);
                FileInputStream in = new FileInputStream(block);
                byte[] buff = new byte[8192];
                while ((read = in.read(buff)) > 0) {
                    out.write(buff, 0, read);
                }
                LOG.info((Object)("Send repaired block " + lb.toString()));
                try {
                    out.close();
                }
                catch (IOException iOException) {}
            }
            catch (IOException e) {
                LOG.info((Object)("Sending repaired block failed. Attempts left: " + --retries), (Throwable)e);
                if (retries != 0) continue;
                throw new IOException("Sending repaired block failed");
                if (this.retryClount > 0) continue;
            }
            break;
        }
    }
}

