/*
 * Decompiled with CFR 0.152.
 */
package io.hops.erasure_coding;

import io.hops.erasure_coding.BlockRepairManager;
import io.hops.erasure_coding.Codec;
import io.hops.erasure_coding.Decoder;
import io.hops.erasure_coding.Report;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.hdfs.dal.RepairJobsDataAccess;
import io.hops.metadata.hdfs.entity.RepairJob;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.handler.RequestHandler;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.BlockReconstructor;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class MapReduceBlockRepairManager
extends BlockRepairManager {
    protected static final Log LOG = LogFactory.getLog(MapReduceBlockRepairManager.class);
    public static final String JOBUSER = "erasure_coding";
    private static final String IN_FILE_SUFFIX = ".in";
    private static final String MAX_FIX_TIME_FOR_FILE = "io.hops.erasure_coding.blockfix.max.fix.time.for.file";
    private static final long DEFAULT_MAX_FIX_TIME_FOR_FILE = 14400000L;
    private Worker corruptionWorker = new CorruptionWorker();
    private static final String REPAIR_TYPE = "repair_type";
    private static final String SOURCE_PATH = "source_path";
    private static final String PARITY_PATH = "parity_path";
    private static final String CODEC_ID = "codec_id";
    private Map<String, ActiveRepair> currentRepairs = new HashMap<String, ActiveRepair>();
    private Collection<ActiveRepair> completedJobs = new ArrayList<ActiveRepair>();
    private boolean initialized = false;

    public MapReduceBlockRepairManager(Configuration conf) {
        super(conf);
    }

    public void repairSourceBlocks(String codecId, Path sourceFile, Path parityFile) {
        this.initialize();
        String uniqueName = sourceFile.getName() + "_" + UUID.randomUUID().toString();
        try {
            this.corruptionWorker.startJob(uniqueName, sourceFile, parityFile, codecId, RepairType.SOURCE_FILE);
        }
        catch (IOException e) {
            LOG.error((Object)"Exception", (Throwable)e);
            LOG.error((Object)"Exception", (Throwable)e);
        }
        catch (InterruptedException e) {
            LOG.error((Object)"Exception", (Throwable)e);
        }
        catch (ClassNotFoundException e) {
            LOG.error((Object)"Exception", (Throwable)e);
        }
    }

    public void repairParityBlocks(String codecId, Path sourceFile, Path parityFile) {
        this.initialize();
        String uniqueName = sourceFile.getName() + "_" + UUID.randomUUID().toString();
        try {
            this.corruptionWorker.startJob(uniqueName, sourceFile, parityFile, codecId, RepairType.PARITY_FILE);
        }
        catch (IOException e) {
            LOG.error((Object)"Exception", (Throwable)e);
        }
        catch (InterruptedException e) {
            LOG.error((Object)"Exception", (Throwable)e);
        }
        catch (ClassNotFoundException e) {
            LOG.error((Object)"Exception", (Throwable)e);
        }
    }

    protected long getMaxFixTimeForFile() {
        return this.getConf().getLong(MAX_FIX_TIME_FOR_FILE, 14400000L);
    }

    public void configureJob(Job job, Class<? extends BlockReconstructor> reconstructorClass) {
        ((JobConf)job.getConfiguration()).setUser(JOBUSER);
        ((JobConf)job.getConfiguration()).setClass("hdfs.blockintegrity.reconstructor", reconstructorClass, BlockReconstructor.class);
    }

    private void submitJob(Job job) throws IOException, InterruptedException, ClassNotFoundException {
        job.submit();
        LOG.info((Object)("Job " + job.getJobID() + "(" + job.getJobName() + ") started"));
    }

    public List<Report> computeReports() throws IOException {
        this.initialize();
        this.cleanRecovery();
        ArrayList<Report> reports = new ArrayList<Report>();
        for (Map.Entry<String, ActiveRepair> entry : this.currentRepairs.entrySet()) {
            String fileName = entry.getKey();
            ActiveRepair job = entry.getValue();
            try {
                if (job.isComplete() && job.isSuccessful()) {
                    LOG.info((Object)"REPAIR COMPLETE");
                    reports.add(new Report(fileName, Report.Status.FINISHED));
                    this.cleanup(job);
                    continue;
                }
                if (job.isComplete() && !job.isSuccessful()) {
                    LOG.info((Object)"REPAIR FAILED");
                    reports.add(new Report(fileName, Report.Status.FAILED));
                    this.cleanup(job);
                    continue;
                }
                if (job.getStartTime() > 0L && System.currentTimeMillis() - job.getStartTime() > this.getMaxFixTimeForFile()) {
                    LOG.info((Object)("Timeout: " + (System.currentTimeMillis() - job.getStartTime()) + " " + job.getStartTime()));
                    job.killJob();
                    reports.add(new Report(fileName, Report.Status.CANCELED));
                    this.cleanup(job);
                    continue;
                }
                LOG.info((Object)"REPAIR RUNNING");
                reports.add(new Report(fileName, Report.Status.ACTIVE));
            }
            catch (Exception e) {
                LOG.info((Object)"Exception during completeness check", (Throwable)e);
                try {
                    job.killJob();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                reports.add(new Report(fileName, Report.Status.FAILED));
                this.cleanup(job);
            }
        }
        for (Report report : reports) {
            Report.Status status = report.getStatus();
            if (status != Report.Status.FINISHED && status != Report.Status.FAILED && status != Report.Status.CANCELED) continue;
            ActiveRepair activeRepair = this.currentRepairs.remove(report.getFilePath());
            this.completedJobs.add(activeRepair);
        }
        return reports;
    }

    public void cancelAll() {
        this.initialize();
        for (ActiveRepair job : this.currentRepairs.values()) {
            try {
                job.killJob();
            }
            catch (Exception e) {
                LOG.error((Object)"Exception", (Throwable)e);
            }
            this.cleanup(job);
        }
        this.currentRepairs.clear();
    }

    public void cancel(String toCancel) {
        this.initialize();
        ActiveRepair job = this.currentRepairs.get(toCancel);
        try {
            job.killJob();
        }
        catch (Exception e) {
            LOG.error((Object)"Exception", (Throwable)e);
        }
        this.currentRepairs.remove(toCancel);
        this.cleanup(job);
    }

    private void cleanup(ActiveRepair job) {
        Path outDir = null;
        try {
            outDir = job.getOutDir();
            outDir.getFileSystem(this.getConf()).delete(outDir, true);
        }
        catch (IOException e) {
            LOG.warn((Object)("Could not delete output dir " + outDir), (Throwable)e);
        }
        Path inDir = null;
        try {
            inDir = job.getInDir();
            inDir.getFileSystem(this.getConf()).delete(inDir, true);
        }
        catch (IOException e) {
            LOG.warn((Object)("Could not delete input dir " + inDir), (Throwable)e);
        }
    }

    private void persistActiveJob(final String path, final org.apache.hadoop.mapreduce.JobID jobId, final String inDir, final String outDir) throws IOException {
        new LightWeightRequestHandler((RequestHandler.OperationType)HDFSOperationType.PERSIST_ENCODING_JOB){

            public Object performTask() throws IOException {
                RepairJobsDataAccess da = (RepairJobsDataAccess)HdfsStorageFactory.getDataAccess(RepairJobsDataAccess.class);
                da.add((Object)new RepairJob(jobId.getJtIdentifier(), jobId.getId(), path, inDir, outDir));
                return null;
            }
        }.handle();
    }

    private void cleanRecovery() throws IOException {
        new LightWeightRequestHandler((RequestHandler.OperationType)HDFSOperationType.DELETE_ENCODING_JOBS){

            public Object performTask() throws IOException {
                RepairJobsDataAccess da = (RepairJobsDataAccess)HdfsStorageFactory.getDataAccess(RepairJobsDataAccess.class);
                Iterator it = MapReduceBlockRepairManager.this.completedJobs.iterator();
                while (it.hasNext()) {
                    ActiveRepair job = (ActiveRepair)it.next();
                    org.apache.hadoop.mapreduce.JobID jobId = job.getJobId();
                    da.delete((Object)new RepairJob(jobId.getJtIdentifier(), jobId.getId()));
                    it.remove();
                }
                return null;
            }
        }.handle();
    }

    private void initialize() {
        if (this.initialized) {
            return;
        }
        try {
            JobClient jobClient = new JobClient(this.getConf());
            for (RepairJob job : this.recoverActiveJobs()) {
                JobID jobId = new JobID(job.getJtIdentifier(), job.getJobId());
                RunningJob runningJob = jobClient.getJob(jobId);
                if (runningJob == null) {
                    throw new IOException("Failed to recover");
                }
                ActiveRepair recovered = new ActiveRepair((org.apache.hadoop.mapreduce.JobID)jobId, runningJob, new Path(job.getInDir()), new Path(job.getOutDir()));
                this.currentRepairs.put(job.getPath(), recovered);
            }
        }
        catch (IOException e) {
            LOG.error((Object)"Encoding job recovery failed", (Throwable)e);
            throw new RuntimeException(e);
        }
        this.initialized = true;
    }

    private Collection<RepairJob> recoverActiveJobs() throws IOException {
        LightWeightRequestHandler handler = new LightWeightRequestHandler((RequestHandler.OperationType)HDFSOperationType.RECOVER_ENCODING_JOBS){

            public Object performTask() throws IOException {
                RepairJobsDataAccess da = (RepairJobsDataAccess)HdfsStorageFactory.getDataAccess(RepairJobsDataAccess.class);
                return da.findAll();
            }
        };
        return (Collection)handler.handle();
    }

    static class ReconstructionMapper
    extends Mapper<LongWritable, Text, Text, Text> {
        protected static final Log LOG = LogFactory.getLog(ReconstructionMapper.class);
        public static final String RECONSTRUCTOR_CLASS_TAG = "hdfs.blockintegrity.reconstructor";
        private BlockReconstructor reconstructor;
        private RepairType repairType;
        private Path sourceFile;
        private Path parityFile;
        private Decoder decoder;

        ReconstructionMapper() {
        }

        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            super.setup(context);
            Configuration conf = context.getConfiguration();
            Codec.initializeCodecs((Configuration)conf);
            this.repairType = RepairType.valueOf(conf.get(MapReduceBlockRepairManager.REPAIR_TYPE));
            this.sourceFile = new Path(conf.get(MapReduceBlockRepairManager.SOURCE_PATH));
            this.parityFile = new Path(conf.get(MapReduceBlockRepairManager.PARITY_PATH));
            this.decoder = new Decoder(conf, Codec.getCodec((String)conf.get(MapReduceBlockRepairManager.CODEC_ID)));
            Class reconstructorClass = context.getConfiguration().getClass(RECONSTRUCTOR_CLASS_TAG, null, BlockReconstructor.class);
            if (reconstructorClass == null) {
                LOG.error((Object)"No class supplied for reconstructor (prop hdfs.blockintegrity.reconstructor)");
                context.progress();
                return;
            }
            try {
                Constructor ctor = reconstructorClass.getConstructor(Configuration.class);
                this.reconstructor = (BlockReconstructor)((Object)ctor.newInstance(conf));
            }
            catch (Exception ex) {
                throw new IOException("Could not instantiate a block reconstructor based on class " + reconstructorClass, ex);
            }
        }

        public void map(LongWritable key, Text fileText, Mapper.Context context) throws IOException, InterruptedException {
            String fileStr = fileText.toString();
            LOG.info((Object)("reconstructing " + fileStr));
            Path file = new Path(fileStr);
            try {
                switch (this.repairType) {
                    case SOURCE_FILE: {
                        this.reconstructor.processFile(this.sourceFile, this.parityFile, this.decoder);
                        break;
                    }
                    case PARITY_FILE: {
                        this.reconstructor.processParityFile(this.sourceFile, this.parityFile, this.decoder);
                    }
                }
            }
            catch (Exception e) {
                LOG.error((Object)("Reconstructing file " + file + " failed"), (Throwable)e);
                context.getCounter((Enum)Counter.FILES_FAILED).increment(1L);
                String outkey = fileStr;
                String outval = "failed";
                context.write((Object)new Text(outkey), (Object)new Text(outval));
                throw new RuntimeException(e);
            }
            context.progress();
        }
    }

    static class ReconstructionInputFormat
    extends SequenceFileInputFormat<LongWritable, Text> {
        protected static final Log LOG = LogFactory.getLog(ReconstructionMapper.class);

        ReconstructionInputFormat() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<InputSplit> getSplits(JobContext job) throws IOException {
            long filesPerTask = 1L;
            Path[] inPaths = ReconstructionInputFormat.getInputPaths((JobContext)job);
            ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
            long fileCounter = 0L;
            for (Path inPath : inPaths) {
                FileStatus[] inFiles;
                FileSystem fs = inPath.getFileSystem(job.getConfiguration());
                if (!fs.getFileStatus(inPath).isDir()) {
                    throw new IOException(inPath.toString() + " is not a directory");
                }
                for (FileStatus inFileStatus : inFiles = fs.listStatus(inPath)) {
                    Path inFile = inFileStatus.getPath();
                    if (inFileStatus.isDir() || !inFile.getName().equals(job.getJobName() + MapReduceBlockRepairManager.IN_FILE_SUFFIX)) continue;
                    ++fileCounter;
                    SequenceFile.Reader inFileReader = new SequenceFile.Reader(fs, inFile, job.getConfiguration());
                    long startPos = inFileReader.getPosition();
                    long counter = 0L;
                    LongWritable key = new LongWritable();
                    Text value = new Text();
                    try {
                        while (inFileReader.next((Writable)key, (Writable)value)) {
                            if (counter % filesPerTask == filesPerTask - 1L) {
                                splits.add((InputSplit)new FileSplit(inFile, startPos, inFileReader.getPosition() - startPos, null));
                                startPos = inFileReader.getPosition();
                            }
                            ++counter;
                        }
                        if (startPos == inFileReader.getPosition()) continue;
                        splits.add((InputSplit)new FileSplit(inFile, startPos, inFileReader.getPosition() - startPos, null));
                    }
                    finally {
                        inFileReader.close();
                    }
                }
            }
            LOG.info((Object)("created " + splits.size() + " input splits from " + fileCounter + " files"));
            return splits;
        }

        public boolean isSplitable(JobContext job, Path file) {
            return true;
        }
    }

    public class CorruptionWorker
    extends Worker {
        public CorruptionWorker() {
            super(LogFactory.getLog(CorruptionWorker.class), BlockReconstructor.class, "blockfixer");
        }
    }

    public abstract class Worker {
        protected final Log LOG;
        protected final Class<? extends BlockReconstructor> RECONSTRUCTOR_CLASS;
        protected final String JOB_NAME_PREFIX;

        protected Worker(Log log, Class<? extends BlockReconstructor> rClass, String prefix) {
            this.LOG = log;
            this.RECONSTRUCTOR_CLASS = rClass;
            this.JOB_NAME_PREFIX = prefix;
        }

        private void startJob(String jobName, Path sourceFile, Path parityFile, String codecId, RepairType type) throws IOException, InterruptedException, ClassNotFoundException {
            Path inDir = new Path(this.JOB_NAME_PREFIX + "/in/" + jobName);
            Path outDir = new Path(this.JOB_NAME_PREFIX + "/out/" + jobName);
            this.createInputFile(jobName, inDir, sourceFile.toUri().getPath());
            Configuration jobConf = new Configuration(MapReduceBlockRepairManager.this.getConf());
            jobConf.set(MapReduceBlockRepairManager.REPAIR_TYPE, type.toString());
            jobConf.set(MapReduceBlockRepairManager.SOURCE_PATH, sourceFile.toUri().getPath());
            jobConf.set(MapReduceBlockRepairManager.PARITY_PATH, parityFile.toUri().getPath());
            jobConf.set(MapReduceBlockRepairManager.CODEC_ID, codecId);
            Job job = new Job(jobConf, jobName);
            MapReduceBlockRepairManager.this.configureJob(job, this.RECONSTRUCTOR_CLASS);
            job.setJarByClass(this.getClass());
            job.setJobName(jobName);
            job.setMapperClass(ReconstructionMapper.class);
            job.setNumReduceTasks(0);
            job.setInputFormatClass(ReconstructionInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            ReconstructionInputFormat.setInputPaths((Job)job, (Path[])new Path[]{inDir});
            SequenceFileOutputFormat.setOutputPath((Job)job, (Path)outDir);
            this.submitJob(job);
            String path = sourceFile.toUri().getPath();
            JobClient jobClient = new JobClient(MapReduceBlockRepairManager.this.getConf());
            org.apache.hadoop.mapreduce.JobID jobId = job.getJobID();
            ActiveRepair activeRepair = new ActiveRepair(jobId, job, inDir, outDir);
            MapReduceBlockRepairManager.this.persistActiveJob(path, jobId, inDir.toUri().getPath(), outDir.toUri().getPath());
            MapReduceBlockRepairManager.this.currentRepairs.put(type == RepairType.SOURCE_FILE ? sourceFile.toUri().getPath() : parityFile.toUri().getPath(), activeRepair);
        }

        private void submitJob(Job job) throws IOException, InterruptedException, ClassNotFoundException {
            this.LOG.info((Object)"Submitting job");
            MapReduceBlockRepairManager.this.submitJob(job);
        }

        private void createInputFile(String jobName, Path inDir, String lostFile) throws IOException {
            Path file = new Path(inDir, jobName + MapReduceBlockRepairManager.IN_FILE_SUFFIX);
            FileSystem fs = file.getFileSystem(MapReduceBlockRepairManager.this.getConf());
            SequenceFile.Writer fileOut = SequenceFile.createWriter((FileSystem)fs, (Configuration)MapReduceBlockRepairManager.this.getConf(), (Path)file, LongWritable.class, Text.class);
            fileOut.append((Writable)new LongWritable(0L), (Writable)new Text(lostFile));
            fileOut.close();
        }
    }

    private class ActiveRepair {
        private org.apache.hadoop.mapreduce.JobID jobId;
        private Job job;
        private RunningJob runningJob;
        private Path inDir;
        private Path outDir;
        private long startTime = System.currentTimeMillis();

        public ActiveRepair(org.apache.hadoop.mapreduce.JobID jobId, Job job, Path inDir, Path outDir) {
            this.jobId = jobId;
            this.job = job;
            this.inDir = inDir;
            this.outDir = outDir;
        }

        public ActiveRepair(org.apache.hadoop.mapreduce.JobID jobId, RunningJob job, Path inDir, Path outDir) {
            this.jobId = jobId;
            this.runningJob = job;
            this.inDir = inDir;
            this.outDir = outDir;
        }

        public org.apache.hadoop.mapreduce.JobID getJobId() {
            return this.jobId;
        }

        public void killJob() throws IOException {
            if (this.job != null) {
                this.job.killJob();
            } else {
                this.runningJob.killJob();
            }
        }

        public boolean isComplete() throws IOException {
            if (this.job != null) {
                return this.job.isComplete();
            }
            return this.runningJob.isComplete();
        }

        public boolean isSuccessful() throws IOException {
            if (this.job != null) {
                return this.job.isSuccessful();
            }
            return this.runningJob.isSuccessful();
        }

        public Path getInDir() {
            return this.inDir;
        }

        public Path getOutDir() {
            return this.outDir;
        }

        public long getStartTime() {
            return this.startTime;
        }
    }

    private static enum RepairType {
        SOURCE_FILE,
        PARITY_FILE;

    }

    static enum Counter {
        FILES_SUCCEEDED,
        FILES_FAILED;

    }
}

