/*
 * Decompiled with CFR 0.152.
 */
package io.hops.erasure_coding;

import io.hops.erasure_coding.BaseEncodingManager;
import io.hops.erasure_coding.Codec;
import io.hops.erasure_coding.PolicyInfo;
import io.hops.erasure_coding.RaidUtils;
import io.hops.metadata.hdfs.entity.EncodingJob;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;

public class MapReduceEncoder {
    protected static final Log LOG = LogFactory.getLog(MapReduceEncoder.class);
    static final String NAME = "mapReduceEncoder";
    static final String JOB_DIR_LABEL = "mapReduceEncoder.job.dir";
    static final String OP_LIST_LABEL = "mapReduceEncoder.op.list";
    static final String OP_COUNT_LABEL = "mapReduceEncoder.op.count";
    static final String SCHEDULER_OPTION_LABEL = "mapReduceEncoder.scheduleroption";
    static final String IGNORE_FAILURES_OPTION_LABEL = "mapReduceEncoder.ignore.failures";
    private static final long OP_PER_MAP = 100L;
    private static final int MAX_MAPS_PER_NODE = 20;
    private static final SimpleDateFormat dateForm = new SimpleDateFormat("yyyy-MM-dd HH:mm");
    private static String jobName = "mapReduceEncoder";
    protected JobConf jobconf;
    private static final Random RANDOM = new Random();
    private JobClient jobClient;
    private RunningJob runningJob;
    private int jobEventCounter = 0;
    private String lastReport = null;
    private long startTime = System.currentTimeMillis();

    public void setConf(Configuration conf) {
        if (this.jobconf != conf) {
            this.jobconf = conf instanceof JobConf ? (JobConf)conf : new JobConf(conf);
        }
    }

    public JobConf getConf() {
        return this.jobconf;
    }

    public MapReduceEncoder(Configuration conf) {
        this.setConf((Configuration)MapReduceEncoder.createJobConf(conf));
    }

    MapReduceEncoder(Configuration conf, EncodingJob job) throws IOException {
        this.jobconf = new JobConf(conf);
        this.jobconf.set(JOB_DIR_LABEL, job.getJobDir());
        JobID jobID = new JobID(job.getJtIdentifier(), job.getJobId());
        JobClient jobClient = new JobClient(this.jobconf);
        this.runningJob = jobClient.getJob(jobID);
        if (this.runningJob == null) {
            throw new IOException("Failed to recover");
        }
    }

    protected static String getRandomId() {
        return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
    }

    private static JobConf createJobConf(Configuration conf) {
        JobConf jobconf = new JobConf(conf, MapReduceEncoder.class);
        jobName = "mapReduceEncoder " + dateForm.format(new Date(BaseEncodingManager.now()));
        jobconf.setUser("erasure_coding");
        jobconf.setJobName(jobName);
        jobconf.setMapSpeculativeExecution(false);
        RaidUtils.parseAndSetOptions((Configuration)jobconf, SCHEDULER_OPTION_LABEL);
        jobconf.setJarByClass(MapReduceEncoder.class);
        jobconf.setInputFormat(DistRaidInputFormat.class);
        jobconf.setOutputKeyClass(Text.class);
        jobconf.setOutputValueClass(Text.class);
        jobconf.setMapperClass(DistRaidMapper.class);
        jobconf.setNumReduceTasks(0);
        return jobconf;
    }

    private static int getMapCount(int srcCount) {
        int numMaps = (int)((long)srcCount / 100L);
        return Math.max(numMaps, 20);
    }

    public boolean startDistRaid(PolicyInfo info) throws IOException {
        if (this.prepareJob(info)) {
            this.jobClient = new JobClient(this.jobconf);
            this.runningJob = this.jobClient.submitJob(this.jobconf);
            LOG.info((Object)("Job Started: " + this.runningJob.getID()));
            this.startTime = System.currentTimeMillis();
            return true;
        }
        return false;
    }

    public boolean checkComplete() throws IOException {
        JobID jobID = this.runningJob.getID();
        if (this.runningJob.isComplete()) {
            String jobdir = this.jobconf.get(JOB_DIR_LABEL);
            if (jobdir != null) {
                Path jobpath = new Path(jobdir);
                jobpath.getFileSystem((Configuration)this.jobconf).delete(jobpath, true);
            }
            if (this.runningJob.isSuccessful()) {
                LOG.info((Object)("Job Complete(Succeeded): " + jobID));
            } else {
                LOG.info((Object)("Job Complete(Failed): " + jobID));
            }
            return true;
        }
        String report = " job " + jobID + " map " + StringUtils.formatPercent((double)this.runningJob.mapProgress(), (int)0) + " reduce " + StringUtils.formatPercent((double)this.runningJob.reduceProgress(), (int)0);
        if (!report.equals(this.lastReport)) {
            LOG.info((Object)report);
            this.lastReport = report;
        }
        TaskCompletionEvent[] events = this.runningJob.getTaskCompletionEvents(this.jobEventCounter);
        this.jobEventCounter += events.length;
        for (TaskCompletionEvent event : events) {
            if (event.getTaskStatus() != TaskCompletionEvent.Status.FAILED) continue;
            LOG.info((Object)(" Job " + jobID + " " + event.toString()));
        }
        return false;
    }

    public void killJob() throws IOException {
        this.runningJob.killJob();
    }

    public boolean successful() throws IOException {
        return this.runningJob.isSuccessful();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean prepareJob(PolicyInfo info) throws IOException {
        String randomId = MapReduceEncoder.getRandomId();
        JobClient jClient = new JobClient(this.jobconf);
        Path jobdir = new Path(jClient.getSystemDir(), "mapReduceEncoder_" + randomId);
        LOG.info((Object)("mapReduceEncoder.job.dir=" + jobdir));
        this.jobconf.set(JOB_DIR_LABEL, jobdir.toString());
        Path log = new Path(jobdir, "_logs");
        FileOutputFormat.setOutputPath((JobConf)this.jobconf, (Path)log);
        LOG.info((Object)("log=" + log));
        FileSystem fs = jobdir.getFileSystem((Configuration)this.jobconf);
        Path opList = new Path(jobdir, "_mapReduceEncoder.op.list");
        this.jobconf.set(OP_LIST_LABEL, opList.toString());
        try (SequenceFile.Writer opWriter = null;){
            opWriter = SequenceFile.createWriter((FileSystem)fs, (Configuration)this.jobconf, (Path)opList, Text.class, PolicyInfo.class, (SequenceFile.CompressionType)SequenceFile.CompressionType.NONE);
            opWriter.append((Writable)new Text(info.getSrcPath().toString()), (Writable)info);
        }
        this.jobconf.setInt(OP_COUNT_LABEL, 1);
        this.jobconf.setNumMapTasks(1);
        LOG.info((Object)("jobName= " + jobName + " numMapTasks=" + this.jobconf.getNumMapTasks()));
        return true;
    }

    public long getStartTime() {
        return this.startTime;
    }

    public Counters getCounters() throws IOException {
        return this.runningJob.getCounters();
    }

    public JobID getJobID() {
        return this.runningJob.getID();
    }

    static class DistRaidMapper
    implements Mapper<Text, PolicyInfo, WritableComparable, Text> {
        private JobConf jobconf;
        private boolean ignoreFailures;
        private int failcount = 0;
        private int succeedcount = 0;
        private BaseEncodingManager.Statistics st = null;
        private Reporter reporter = null;

        DistRaidMapper() {
        }

        private String getCountString() {
            return "Succeeded: " + this.succeedcount + " Failed: " + this.failcount;
        }

        public void configure(JobConf job) {
            this.jobconf = job;
            this.ignoreFailures = this.jobconf.getBoolean(MapReduceEncoder.IGNORE_FAILURES_OPTION_LABEL, true);
            this.st = new BaseEncodingManager.Statistics();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void map(Text key, PolicyInfo policy, OutputCollector<WritableComparable, Text> out, Reporter reporter) throws IOException {
            this.reporter = reporter;
            try {
                Codec.initializeCodecs((Configuration)this.jobconf);
                LOG.info((Object)("Raiding file=" + key.toString() + " policy=" + policy));
                Path p = new Path(key.toString());
                this.st.clear();
                BaseEncodingManager.doRaid((Configuration)this.jobconf, policy, p, this.st, (Progressable)reporter);
                ++this.succeedcount;
                reporter.incrCounter((Enum)Counter.PROCESSED_BLOCKS, this.st.numProcessedBlocks);
                reporter.incrCounter((Enum)Counter.PROCESSED_SIZE, this.st.processedSize);
                reporter.incrCounter((Enum)Counter.META_BLOCKS, this.st.numMetaBlocks);
                reporter.incrCounter((Enum)Counter.META_SIZE, this.st.metaSize);
                reporter.incrCounter((Enum)Counter.SAVING_SIZE, this.st.processedSize - this.st.remainingSize - this.st.metaSize);
                reporter.incrCounter((Enum)Counter.FILES_SUCCEEDED, 1L);
            }
            catch (IOException e) {
                ++this.failcount;
                reporter.incrCounter((Enum)Counter.FILES_FAILED, 1L);
                String s = "FAIL: " + policy + ", " + key + " " + StringUtils.stringifyException((Throwable)e);
                out.collect(null, (Object)new Text(s));
                LOG.info((Object)s);
            }
            finally {
                reporter.setStatus(this.getCountString());
            }
        }

        public void close() throws IOException {
            if (this.failcount == 0 || this.ignoreFailures) {
                return;
            }
            throw new IOException(this.getCountString());
        }
    }

    static class DistRaidInputFormat
    implements InputFormat<Text, PolicyInfo> {
        DistRaidInputFormat() {
        }

        public void validateInput(JobConf job) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
            int srcCount = job.getInt(MapReduceEncoder.OP_COUNT_LABEL, -1);
            int targetcount = srcCount / numSplits;
            String srclist = job.get(MapReduceEncoder.OP_LIST_LABEL, "");
            if (srcCount < 0 || "".equals(srclist)) {
                throw new RuntimeException("Invalid metadata: #files(" + srcCount + ") listuri(" + srclist + ")");
            }
            Path srcs = new Path(srclist);
            FileSystem fs = srcs.getFileSystem((Configuration)job);
            ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
            Text key = new Text();
            PolicyInfo value = new PolicyInfo();
            SequenceFile.Reader in = null;
            long prev = 0L;
            int count = 0;
            try {
                in = new SequenceFile.Reader(fs, srcs, (Configuration)job);
                while (in.next((Writable)key, (Writable)value)) {
                    long curr = in.getPosition();
                    long delta = curr - prev;
                    if (++count <= targetcount) continue;
                    count = 0;
                    splits.add(new FileSplit(srcs, prev, delta, (String[])null));
                    prev = curr;
                }
            }
            finally {
                in.close();
            }
            long remaining = fs.getFileStatus(srcs).getLen() - prev;
            if (remaining != 0L) {
                splits.add(new FileSplit(srcs, prev, remaining, (String[])null));
            }
            LOG.info((Object)("jobname= " + jobName + " numSplits=" + numSplits + ", splits.size()=" + splits.size()));
            return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);
        }

        public RecordReader<Text, PolicyInfo> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
            return new SequenceFileRecordReader((Configuration)job, (FileSplit)split);
        }
    }

    public static enum Counter {
        FILES_SUCCEEDED,
        FILES_FAILED,
        PROCESSED_BLOCKS,
        PROCESSED_SIZE,
        META_BLOCKS,
        META_SIZE,
        SAVING_SIZE;

    }
}

