package io.hops.erasure_coding;

import io.hops.erasure_coding.Report;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.hdfs.dal.EncodingJobsDataAccess;
import io.hops.metadata.hdfs.entity.EncodingJob;
import io.hops.metadata.hdfs.entity.EncodingPolicy;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.LightWeightRequestHandler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobID;

/* loaded from: input_file:io/hops/erasure_coding/MapReduceEncodingManager.class */
public class MapReduceEncodingManager extends BaseEncodingManager {
    public static final Log LOG = LogFactory.getLog(MapReduceEncodingManager.class);
    public static final String ENCODING_JOB_EXECUTION_LIMIT = "io.hops.erasure_coding.encoding_job_execution_limit";
    public static final long DEFAULT_ENCODING_JOB_EXECUTION_LIMIT = 86400000;
    private final long executionLimit;
    private Map<String, MapReduceEncoder> currentJobs;
    private Collection<MapReduceEncoder> completedJobs;
    private boolean initialized;

    public MapReduceEncodingManager(Configuration configuration) throws IOException {
        super(configuration);
        this.currentJobs = new HashMap();
        this.completedJobs = new ArrayList();
        this.initialized = false;
        this.executionLimit = configuration.getLong(ENCODING_JOB_EXECUTION_LIMIT, DEFAULT_ENCODING_JOB_EXECUTION_LIMIT);
        LOG.info("created");
    }

    public void encodeFile(EncodingPolicy encodingPolicy, Path path, Path path2, boolean z) {
        initialize();
        Codec codec = Codec.getCodec(encodingPolicy.getCodec());
        LOG.info("Start encoding with policy: " + encodingPolicy + " for source file " + path.toUri().getPath() + " and parity file " + path2 + " copy " + z);
        PolicyInfo policyInfo = new PolicyInfo();
        try {
            policyInfo.setSrcPath(path.toUri().getPath());
            policyInfo.setCodecId(codec.getId());
            policyInfo.setProperty(PolicyInfo.PROPERTY_PARITY_PATH, path2.toUri().getPath());
            policyInfo.setProperty(PolicyInfo.PROPERTY_REPLICATION, String.valueOf((int) encodingPolicy.getTargetReplication()));
            policyInfo.setProperty(PolicyInfo.PROPERTY_PARITY_REPLICATION, String.valueOf(1));
            policyInfo.setProperty(PolicyInfo.PROPERTY_COPY, String.valueOf(z));
            raidFiles(policyInfo);
        } catch (IOException e) {
            LOG.error("Exception", e);
        }
    }

    public void raidFiles(PolicyInfo policyInfo) throws IOException {
        MapReduceEncoder mapReduceEncoder = new MapReduceEncoder(this.conf);
        if (mapReduceEncoder.startDistRaid(policyInfo)) {
            String path = policyInfo.getSrcPath().toUri().getPath();
            persistActiveJob(path, mapReduceEncoder.getJobID(), mapReduceEncoder.getConf().get("mapReduceEncoder.job.dir"));
            this.currentJobs.put(path, mapReduceEncoder);
        }
    }

    public List<Report> computeReports() throws IOException {
        initialize();
        cleanRecovery();
        ArrayList<Report> arrayList = new ArrayList(this.currentJobs.size());
        for (Map.Entry<String, MapReduceEncoder> entry : this.currentJobs.entrySet()) {
            String key = entry.getKey();
            MapReduceEncoder value = entry.getValue();
            try {
                if (value.checkComplete() && value.successful()) {
                    arrayList.add(new Report(key, Report.Status.FINISHED));
                    LOG.info("Encoding successful for job " + value.getJobID());
                } else if (value.checkComplete() && !value.successful()) {
                    arrayList.add(new Report(key, Report.Status.FAILED));
                    LOG.info("Encoding failed for job " + value.getJobID());
                } else if (System.currentTimeMillis() - value.getStartTime() > this.executionLimit) {
                    value.killJob();
                    arrayList.add(new Report(key, Report.Status.CANCELED));
                    LOG.info("Encoding canceled for job " + value.getJobID());
                } else {
                    arrayList.add(new Report(key, Report.Status.ACTIVE));
                    LOG.info("Encoding active for job " + value.getJobID());
                }
            } catch (IOException e) {
                LOG.error("Exception during completeness check", e);
                try {
                    value.killJob();
                } catch (IOException e2) {
                }
                arrayList.add(new Report(key, Report.Status.FAILED));
            }
        }
        for (Report report : arrayList) {
            Report.Status status = report.getStatus();
            if (status == Report.Status.FINISHED || status == Report.Status.FAILED || status == Report.Status.CANCELED) {
                this.completedJobs.add(this.currentJobs.remove(report.getFilePath()));
            }
        }
        return arrayList;
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [io.hops.erasure_coding.MapReduceEncodingManager$1] */
    private void persistActiveJob(final String str, final JobID jobID, final String str2) throws IOException {
        new LightWeightRequestHandler(HDFSOperationType.PERSIST_ENCODING_JOB) { // from class: io.hops.erasure_coding.MapReduceEncodingManager.1
            public Object performTask() throws IOException {
                HdfsStorageFactory.getDataAccess(EncodingJobsDataAccess.class).add(new EncodingJob(jobID.getJtIdentifier(), jobID.getId(), str, str2));
                return null;
            }
        }.handle();
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [io.hops.erasure_coding.MapReduceEncodingManager$2] */
    private void cleanRecovery() throws IOException {
        new LightWeightRequestHandler(HDFSOperationType.DELETE_ENCODING_JOBS) { // from class: io.hops.erasure_coding.MapReduceEncodingManager.2
            public Object performTask() throws IOException {
                EncodingJobsDataAccess dataAccess = HdfsStorageFactory.getDataAccess(EncodingJobsDataAccess.class);
                Iterator it = MapReduceEncodingManager.this.completedJobs.iterator();
                while (it.hasNext()) {
                    JobID jobID = ((MapReduceEncoder) it.next()).getJobID();
                    dataAccess.delete(new EncodingJob(jobID.getJtIdentifier(), jobID.getId()));
                    it.remove();
                }
                return null;
            }
        }.handle();
    }

    public void cancelAll() {
        initialize();
        Iterator<MapReduceEncoder> it = this.currentJobs.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().killJob();
            } catch (IOException e) {
                LOG.error("Exception", e);
            }
        }
        this.currentJobs.clear();
    }

    public void cancel(String str) {
        initialize();
        try {
            this.currentJobs.get(str).killJob();
        } catch (IOException e) {
            LOG.error("Exception", e);
        }
        this.currentJobs.remove(str);
    }

    private void initialize() {
        if (this.initialized) {
            return;
        }
        try {
            for (EncodingJob encodingJob : recoverActiveJobs()) {
                this.currentJobs.put(encodingJob.getPath(), new MapReduceEncoder(this.conf, encodingJob));
            }
            this.initialized = true;
        } catch (IOException e) {
            LOG.error("Encoding job recovery failed", e);
            throw new RuntimeException(e);
        }
    }

    private Collection<EncodingJob> recoverActiveJobs() throws IOException {
        return (Collection) new LightWeightRequestHandler(HDFSOperationType.RECOVER_ENCODING_JOBS) { // from class: io.hops.erasure_coding.MapReduceEncodingManager.3
            public Object performTask() throws IOException {
                return HdfsStorageFactory.getDataAccess(EncodingJobsDataAccess.class).findAll();
            }
        }.handle();
    }
}
