/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.airflow;

import freemarker.template.TemplateException;
import io.hops.hopsworks.common.airflow.AirflowDagDTO;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.util.TemplateEngine;
import io.hops.hopsworks.common.util.templates.airflow.AirflowDAG;
import io.hops.hopsworks.common.util.templates.airflow.AirflowJobLaunchOperator;
import io.hops.hopsworks.common.util.templates.airflow.AirflowJobSuccessSensor;
import io.hops.hopsworks.exceptions.AirflowException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.logging.Level;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class AirflowController {
    @EJB
    private DistributedFsService dfs;
    @EJB
    private TemplateEngine templateEngine;
    @EJB
    private Settings settings;

    public void composeDAG(Project project, Users user, AirflowDagDTO dagDefinition) throws AirflowException {
        AirflowDAG dag = AirflowDagDTO.toAirflowDagTemplate(dagDefinition, user, project);
        HashMap<String, Object> dataModel = new HashMap<String, Object>(4);
        dataModel.put(AirflowJobLaunchOperator.class.getSimpleName(), AirflowJobLaunchOperator.class);
        dataModel.put(AirflowJobSuccessSensor.class.getSimpleName(), AirflowJobSuccessSensor.class);
        dataModel.put("dag", dag);
        DistributedFileSystemOps dfso = null;
        try {
            dfso = this.dfs.getDfsOps(project, user);
            StringWriter stringWriter = new StringWriter();
            String dagStr = this.templateEngine.template(dataModel, stringWriter, "airflow_dag.py");
            Path dagPath = new Path(Utils.getProjectPath(project.getName()) + Settings.ServiceDataset.AIRFLOW.getName() + File.separator + dagDefinition.getName() + ".py");
            dfso.create(dagPath, dagStr);
        }
        catch (TemplateException | IOException ex) {
            throw new AirflowException(RESTCodes.AirflowErrorCode.DAG_NOT_TEMPLATED, Level.SEVERE, "Could not template DAG file for Project " + project.getName(), ex.getMessage(), ex);
        }
        finally {
            if (dfso != null) {
                this.dfs.closeDfsClient(dfso);
            }
        }
    }

    public void grantAirflowPermissions(Project project, DistributedFileSystemOps udfso) throws ProjectException {
        try {
            String airflowUser = this.settings.getAirflowUser();
            ArrayList<AclEntry> aclEntries = new ArrayList<AclEntry>();
            AclEntry accessAcl = new AclEntry.Builder().setType(AclEntryType.USER).setName(airflowUser).setScope(AclEntryScope.ACCESS).setPermission(FsAction.READ_EXECUTE).build();
            AclEntry defaultAcl = new AclEntry.Builder().setType(AclEntryType.USER).setName(airflowUser).setScope(AclEntryScope.DEFAULT).setPermission(FsAction.READ_EXECUTE).build();
            aclEntries.add(accessAcl);
            aclEntries.add(defaultAcl);
            udfso.updateAcls(new Path(Utils.getProjectPath(project.getName()) + Settings.ServiceDataset.AIRFLOW.getName()), aclEntries);
        }
        catch (IOException ex) {
            throw new ProjectException(RESTCodes.ProjectErrorCode.PROJECT_SET_PERMISSIONS_ERROR, Level.SEVERE, "Failed to set permissions to airflow user on the Airflow dataset", ex.getMessage(), (Throwable)ex);
        }
    }
}

