/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.flink;

import io.hops.hopsworks.common.dao.hdfs.inode.Inode;
import io.hops.hopsworks.common.dao.kagent.HostServices;
import io.hops.hopsworks.common.dao.kagent.HostServicesFacade;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.inode.InodeController;
import io.hops.hopsworks.common.jobs.flink.FlinkController;
import io.hops.hopsworks.common.util.Settings;
import java.io.File;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Timer;
import org.apache.hadoop.fs.Path;

@Singleton
public class FlinkCleaner {
    private static final Logger LOGGER = Logger.getLogger(FlinkCleaner.class.getName());
    @EJB
    private FlinkController flinkController;
    @EJB
    private Settings settings;
    @EJB
    private InodeController inodeController;
    @EJB
    private DistributedFsService dfs;
    @EJB
    private HostServicesFacade hostServicesFacade;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Schedule(persistent=false, minute="0", hour="1")
    public void deleteOrphanJobs(Timer timer) {
        LOGGER.log(Level.INFO, "Running FlinkCleaner.");
        DistributedFileSystemOps dfso = null;
        try {
            List<HostServices> hosts = this.hostServicesFacade.findServices("flinkhistoryserver");
            if (hosts.isEmpty()) {
                LOGGER.log(Level.INFO, "Could not find flinkhistoryserver service running on any server, shutting down timer.");
                timer.cancel();
            }
            String archiveDir = this.flinkController.getArchiveDir();
            dfso = this.dfs.getDfsOps();
            List<Inode> jobs = this.inodeController.getChildren(archiveDir);
            for (Inode job : jobs) {
                if (job.getHdfsUser() != null) continue;
                dfso.rm(new Path(archiveDir + File.separator + job.getInodePK().getName()), false);
            }
            if (dfso != null) {
                this.dfs.closeDfsClient(dfso);
            }
        }
        catch (Exception ex) {
            try {
                LOGGER.log(Level.SEVERE, "Could not access " + this.settings.getFlinkConfFile(), ex);
                if (dfso != null) {
                    this.dfs.closeDfsClient(dfso);
                }
            }
            catch (Throwable throwable) {
                if (dfso != null) {
                    this.dfs.closeDfsClient(dfso);
                }
                throw throwable;
            }
        }
    }
}

