package io.hops.erasure_coding;

import io.hops.erasure_coding.Report;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.hdfs.dal.RepairJobsDataAccess;
import io.hops.metadata.hdfs.entity.RepairJob;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.LightWeightRequestHandler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.BlockReconstructor;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

/* loaded from: input_file:io/hops/erasure_coding/MapReduceBlockRepairManager.class */
public class MapReduceBlockRepairManager extends BlockRepairManager {
    protected static final Log LOG = LogFactory.getLog(MapReduceBlockRepairManager.class);
    public static final String JOBUSER = "erasure_coding";
    private static final String IN_FILE_SUFFIX = ".in";
    private static final String MAX_FIX_TIME_FOR_FILE = "io.hops.erasure_coding.blockfix.max.fix.time.for.file";
    private static final long DEFAULT_MAX_FIX_TIME_FOR_FILE = 14400000;
    private Worker corruptionWorker;
    private static final String REPAIR_TYPE = "repair_type";
    private static final String SOURCE_PATH = "source_path";
    private static final String PARITY_PATH = "parity_path";
    private static final String CODEC_ID = "codec_id";
    private Map<String, ActiveRepair> currentRepairs;
    private Collection<ActiveRepair> completedJobs;
    private boolean initialized;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hops/erasure_coding/MapReduceBlockRepairManager$ActiveRepair.class */
    public class ActiveRepair {
        private JobID jobId;
        private Job job;
        private RunningJob runningJob;
        private Path inDir;
        private Path outDir;
        private long startTime = System.currentTimeMillis();

        public ActiveRepair(JobID jobID, Job job, Path path, Path path2) {
            this.jobId = jobID;
            this.job = job;
            this.inDir = path;
            this.outDir = path2;
        }

        public ActiveRepair(JobID jobID, RunningJob runningJob, Path path, Path path2) {
            this.jobId = jobID;
            this.runningJob = runningJob;
            this.inDir = path;
            this.outDir = path2;
        }

        public JobID getJobId() {
            return this.jobId;
        }

        public void killJob() throws IOException {
            if (this.job != null) {
                this.job.killJob();
            } else {
                this.runningJob.killJob();
            }
        }

        public boolean isComplete() throws IOException {
            return this.job != null ? this.job.isComplete() : this.runningJob.isComplete();
        }

        public boolean isSuccessful() throws IOException {
            return this.job != null ? this.job.isSuccessful() : this.runningJob.isSuccessful();
        }

        public Path getInDir() {
            return this.inDir;
        }

        public Path getOutDir() {
            return this.outDir;
        }

        public long getStartTime() {
            return this.startTime;
        }
    }

    /* loaded from: input_file:io/hops/erasure_coding/MapReduceBlockRepairManager$CorruptionWorker.class */
    public class CorruptionWorker extends Worker {
        public CorruptionWorker() {
            super(LogFactory.getLog(CorruptionWorker.class), BlockReconstructor.class, "blockfixer");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/hops/erasure_coding/MapReduceBlockRepairManager$Counter.class */
    public enum Counter {
        FILES_SUCCEEDED,
        FILES_FAILED
    }

    /* loaded from: input_file:io/hops/erasure_coding/MapReduceBlockRepairManager$ReconstructionInputFormat.class */
    static class ReconstructionInputFormat extends SequenceFileInputFormat<LongWritable, Text> {
        protected static final Log LOG = LogFactory.getLog(ReconstructionMapper.class);

        ReconstructionInputFormat() {
        }

        public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
            Path[] inputPaths = getInputPaths(jobContext);
            ArrayList arrayList = new ArrayList();
            long j = 0;
            for (Path path : inputPaths) {
                FileSystem fileSystem = path.getFileSystem(jobContext.getConfiguration());
                if (!fileSystem.getFileStatus(path).isDir()) {
                    throw new IOException(path.toString() + " is not a directory");
                }
                for (FileStatus fileStatus : fileSystem.listStatus(path)) {
                    Path path2 = fileStatus.getPath();
                    if (!fileStatus.isDir() && path2.getName().equals(jobContext.getJobName() + MapReduceBlockRepairManager.IN_FILE_SUFFIX)) {
                        j++;
                        SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, path2, jobContext.getConfiguration());
                        long position = reader.getPosition();
                        long j2 = 0;
                        LongWritable longWritable = new LongWritable();
                        Text text = new Text();
                        while (reader.next(longWritable, text)) {
                            try {
                                if (j2 % 1 == 1 - 1) {
                                    arrayList.add(new FileSplit(path2, position, reader.getPosition() - position, (String[]) null));
                                    position = reader.getPosition();
                                }
                                j2++;
                            } finally {
                                reader.close();
                            }
                        }
                        if (position != reader.getPosition()) {
                            arrayList.add(new FileSplit(path2, position, reader.getPosition() - position, (String[]) null));
                        }
                    }
                }
            }
            LOG.info("created " + arrayList.size() + " input splits from " + j + " files");
            return arrayList;
        }

        public boolean isSplitable(JobContext jobContext, Path path) {
            return true;
        }
    }

    /* loaded from: input_file:io/hops/erasure_coding/MapReduceBlockRepairManager$ReconstructionMapper.class */
    static class ReconstructionMapper extends Mapper<LongWritable, Text, Text, Text> {
        protected static final Log LOG = LogFactory.getLog(ReconstructionMapper.class);
        public static final String RECONSTRUCTOR_CLASS_TAG = "hdfs.blockintegrity.reconstructor";
        private BlockReconstructor reconstructor;
        private RepairType repairType;
        private Path sourceFile;
        private Path parityFile;
        private Decoder decoder;

        ReconstructionMapper() {
        }

        protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            super.setup(context);
            Configuration configuration = context.getConfiguration();
            Codec.initializeCodecs(configuration);
            this.repairType = RepairType.valueOf(configuration.get(MapReduceBlockRepairManager.REPAIR_TYPE));
            this.sourceFile = new Path(configuration.get(MapReduceBlockRepairManager.SOURCE_PATH));
            this.parityFile = new Path(configuration.get(MapReduceBlockRepairManager.PARITY_PATH));
            this.decoder = new Decoder(configuration, Codec.getCodec(configuration.get(MapReduceBlockRepairManager.CODEC_ID)));
            Class cls = context.getConfiguration().getClass(RECONSTRUCTOR_CLASS_TAG, (Class) null, BlockReconstructor.class);
            if (cls == null) {
                LOG.error("No class supplied for reconstructor (prop hdfs.blockintegrity.reconstructor)");
                context.progress();
            } else {
                try {
                    this.reconstructor = (BlockReconstructor) cls.getConstructor(Configuration.class).newInstance(configuration);
                } catch (Exception e) {
                    throw new IOException("Could not instantiate a block reconstructor based on class " + cls, e);
                }
            }
        }

        public void map(LongWritable longWritable, Text text, Mapper.Context context) throws IOException, InterruptedException {
            String text2 = text.toString();
            LOG.info("reconstructing " + text2);
            Path path = new Path(text2);
            try {
                switch (this.repairType) {
                    case SOURCE_FILE:
                        this.reconstructor.processFile(this.sourceFile, this.parityFile, this.decoder);
                        break;
                    case PARITY_FILE:
                        this.reconstructor.processParityFile(this.sourceFile, this.parityFile, this.decoder);
                        break;
                }
                context.progress();
            } catch (Exception e) {
                LOG.error("Reconstructing file " + path + " failed", e);
                context.getCounter(Counter.FILES_FAILED).increment(1L);
                context.write(new Text(text2), new Text("failed"));
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hops/erasure_coding/MapReduceBlockRepairManager$RepairType.class */
    public enum RepairType {
        SOURCE_FILE,
        PARITY_FILE
    }

    /* loaded from: input_file:io/hops/erasure_coding/MapReduceBlockRepairManager$Worker.class */
    public abstract class Worker {
        protected final Log LOG;
        protected final Class<? extends BlockReconstructor> RECONSTRUCTOR_CLASS;
        protected final String JOB_NAME_PREFIX;

        protected Worker(Log log, Class<? extends BlockReconstructor> cls, String str) {
            this.LOG = log;
            this.RECONSTRUCTOR_CLASS = cls;
            this.JOB_NAME_PREFIX = str;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startJob(String str, Path path, Path path2, String str2, RepairType repairType) throws IOException, InterruptedException, ClassNotFoundException {
            Path path3 = new Path(this.JOB_NAME_PREFIX + "/in/" + str);
            Path path4 = new Path(this.JOB_NAME_PREFIX + "/out/" + str);
            createInputFile(str, path3, path.toUri().getPath());
            Configuration configuration = new Configuration(MapReduceBlockRepairManager.this.getConf());
            configuration.set(MapReduceBlockRepairManager.REPAIR_TYPE, repairType.toString());
            configuration.set(MapReduceBlockRepairManager.SOURCE_PATH, path.toUri().getPath());
            configuration.set(MapReduceBlockRepairManager.PARITY_PATH, path2.toUri().getPath());
            configuration.set(MapReduceBlockRepairManager.CODEC_ID, str2);
            Job job = new Job(configuration, str);
            MapReduceBlockRepairManager.this.configureJob(job, this.RECONSTRUCTOR_CLASS);
            job.setJarByClass(getClass());
            job.setJobName(str);
            job.setMapperClass(ReconstructionMapper.class);
            job.setNumReduceTasks(0);
            job.setInputFormatClass(ReconstructionInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            ReconstructionInputFormat.setInputPaths(job, new Path[]{path3});
            SequenceFileOutputFormat.setOutputPath(job, path4);
            submitJob(job);
            String path5 = path.toUri().getPath();
            new JobClient(MapReduceBlockRepairManager.this.getConf());
            JobID jobID = job.getJobID();
            ActiveRepair activeRepair = new ActiveRepair(jobID, job, path3, path4);
            MapReduceBlockRepairManager.this.persistActiveJob(path5, jobID, path3.toUri().getPath(), path4.toUri().getPath());
            MapReduceBlockRepairManager.this.currentRepairs.put(repairType == RepairType.SOURCE_FILE ? path.toUri().getPath() : path2.toUri().getPath(), activeRepair);
        }

        private void submitJob(Job job) throws IOException, InterruptedException, ClassNotFoundException {
            this.LOG.info("Submitting job");
            MapReduceBlockRepairManager.this.submitJob(job);
        }

        private void createInputFile(String str, Path path, String str2) throws IOException {
            Path path2 = new Path(path, str + MapReduceBlockRepairManager.IN_FILE_SUFFIX);
            SequenceFile.Writer createWriter = SequenceFile.createWriter(path2.getFileSystem(MapReduceBlockRepairManager.this.getConf()), MapReduceBlockRepairManager.this.getConf(), path2, LongWritable.class, Text.class);
            createWriter.append(new LongWritable(0L), new Text(str2));
            createWriter.close();
        }
    }

    public MapReduceBlockRepairManager(Configuration configuration) {
        super(configuration);
        this.corruptionWorker = new CorruptionWorker();
        this.currentRepairs = new HashMap();
        this.completedJobs = new ArrayList();
        this.initialized = false;
    }

    public void repairSourceBlocks(String str, Path path, Path path2) {
        initialize();
        try {
            this.corruptionWorker.startJob(path.getName() + "_" + UUID.randomUUID().toString(), path, path2, str, RepairType.SOURCE_FILE);
        } catch (IOException e) {
            LOG.error("Exception", e);
            LOG.error("Exception", e);
        } catch (ClassNotFoundException e2) {
            LOG.error("Exception", e2);
        } catch (InterruptedException e3) {
            LOG.error("Exception", e3);
        }
    }

    public void repairParityBlocks(String str, Path path, Path path2) {
        initialize();
        try {
            this.corruptionWorker.startJob(path.getName() + "_" + UUID.randomUUID().toString(), path, path2, str, RepairType.PARITY_FILE);
        } catch (IOException e) {
            LOG.error("Exception", e);
        } catch (ClassNotFoundException e2) {
            LOG.error("Exception", e2);
        } catch (InterruptedException e3) {
            LOG.error("Exception", e3);
        }
    }

    protected long getMaxFixTimeForFile() {
        return getConf().getLong(MAX_FIX_TIME_FOR_FILE, DEFAULT_MAX_FIX_TIME_FOR_FILE);
    }

    public void configureJob(Job job, Class<? extends BlockReconstructor> cls) {
        job.getConfiguration().setUser("erasure_coding");
        job.getConfiguration().setClass(ReconstructionMapper.RECONSTRUCTOR_CLASS_TAG, cls, BlockReconstructor.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitJob(Job job) throws IOException, InterruptedException, ClassNotFoundException {
        job.submit();
        LOG.info("Job " + job.getJobID() + "(" + job.getJobName() + ") started");
    }

    public List<Report> computeReports() throws IOException {
        initialize();
        cleanRecovery();
        ArrayList<Report> arrayList = new ArrayList();
        for (Map.Entry<String, ActiveRepair> entry : this.currentRepairs.entrySet()) {
            String key = entry.getKey();
            ActiveRepair value = entry.getValue();
            try {
                if (value.isComplete() && value.isSuccessful()) {
                    LOG.info("REPAIR COMPLETE");
                    arrayList.add(new Report(key, Report.Status.FINISHED));
                    cleanup(value);
                } else if (value.isComplete() && !value.isSuccessful()) {
                    LOG.info("REPAIR FAILED");
                    arrayList.add(new Report(key, Report.Status.FAILED));
                    cleanup(value);
                } else if (value.getStartTime() <= 0 || System.currentTimeMillis() - value.getStartTime() <= getMaxFixTimeForFile()) {
                    LOG.info("REPAIR RUNNING");
                    arrayList.add(new Report(key, Report.Status.ACTIVE));
                } else {
                    LOG.info("Timeout: " + (System.currentTimeMillis() - value.getStartTime()) + " " + value.getStartTime());
                    value.killJob();
                    arrayList.add(new Report(key, Report.Status.CANCELED));
                    cleanup(value);
                }
            } catch (Exception e) {
                LOG.info("Exception during completeness check", e);
                try {
                    value.killJob();
                } catch (Exception e2) {
                }
                arrayList.add(new Report(key, Report.Status.FAILED));
                cleanup(value);
            }
        }
        for (Report report : arrayList) {
            Report.Status status = report.getStatus();
            if (status == Report.Status.FINISHED || status == Report.Status.FAILED || status == Report.Status.CANCELED) {
                this.completedJobs.add(this.currentRepairs.remove(report.getFilePath()));
            }
        }
        return arrayList;
    }

    public void cancelAll() {
        initialize();
        for (ActiveRepair activeRepair : this.currentRepairs.values()) {
            try {
                activeRepair.killJob();
            } catch (Exception e) {
                LOG.error("Exception", e);
            }
            cleanup(activeRepair);
        }
        this.currentRepairs.clear();
    }

    public void cancel(String str) {
        initialize();
        ActiveRepair activeRepair = this.currentRepairs.get(str);
        try {
            activeRepair.killJob();
        } catch (Exception e) {
            LOG.error("Exception", e);
        }
        this.currentRepairs.remove(str);
        cleanup(activeRepair);
    }

    private void cleanup(ActiveRepair activeRepair) {
        Path path = null;
        try {
            path = activeRepair.getOutDir();
            path.getFileSystem(getConf()).delete(path, true);
        } catch (IOException e) {
            LOG.warn("Could not delete output dir " + path, e);
        }
        Path path2 = null;
        try {
            path2 = activeRepair.getInDir();
            path2.getFileSystem(getConf()).delete(path2, true);
        } catch (IOException e2) {
            LOG.warn("Could not delete input dir " + path2, e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v0, types: [io.hops.erasure_coding.MapReduceBlockRepairManager$1] */
    public void persistActiveJob(final String str, final JobID jobID, final String str2, final String str3) throws IOException {
        new LightWeightRequestHandler(HDFSOperationType.PERSIST_ENCODING_JOB) { // from class: io.hops.erasure_coding.MapReduceBlockRepairManager.1
            public Object performTask() throws IOException {
                HdfsStorageFactory.getDataAccess(RepairJobsDataAccess.class).add(new RepairJob(jobID.getJtIdentifier(), jobID.getId(), str, str2, str3));
                return null;
            }
        }.handle();
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [io.hops.erasure_coding.MapReduceBlockRepairManager$2] */
    private void cleanRecovery() throws IOException {
        new LightWeightRequestHandler(HDFSOperationType.DELETE_ENCODING_JOBS) { // from class: io.hops.erasure_coding.MapReduceBlockRepairManager.2
            public Object performTask() throws IOException {
                RepairJobsDataAccess dataAccess = HdfsStorageFactory.getDataAccess(RepairJobsDataAccess.class);
                Iterator it = MapReduceBlockRepairManager.this.completedJobs.iterator();
                while (it.hasNext()) {
                    JobID jobId = ((ActiveRepair) it.next()).getJobId();
                    dataAccess.delete(new RepairJob(jobId.getJtIdentifier(), jobId.getId()));
                    it.remove();
                }
                return null;
            }
        }.handle();
    }

    private void initialize() {
        if (this.initialized) {
            return;
        }
        try {
            JobClient jobClient = new JobClient(getConf());
            for (RepairJob repairJob : recoverActiveJobs()) {
                org.apache.hadoop.mapred.JobID jobID = new org.apache.hadoop.mapred.JobID(repairJob.getJtIdentifier(), repairJob.getJobId());
                RunningJob job = jobClient.getJob(jobID);
                if (job == null) {
                    throw new IOException("Failed to recover");
                }
                this.currentRepairs.put(repairJob.getPath(), new ActiveRepair((JobID) jobID, job, new Path(repairJob.getInDir()), new Path(repairJob.getOutDir())));
            }
            this.initialized = true;
        } catch (IOException e) {
            LOG.error("Encoding job recovery failed", e);
            throw new RuntimeException(e);
        }
    }

    private Collection<RepairJob> recoverActiveJobs() throws IOException {
        return (Collection) new LightWeightRequestHandler(HDFSOperationType.RECOVER_ENCODING_JOBS) { // from class: io.hops.erasure_coding.MapReduceBlockRepairManager.3
            public Object performTask() throws IOException {
                return HdfsStorageFactory.getDataAccess(RepairJobsDataAccess.class).findAll();
            }
        }.handle();
    }
}
