/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.app;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.hops.hopsworks.common.dataset.DatasetController;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.OptionDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.IngestionDataFormat;
import io.hops.hopsworks.common.featurestore.featuregroup.IngestionJob;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.DeltaStreamerJobConf;
import io.hops.hopsworks.common.featurestore.query.Query;
import io.hops.hopsworks.common.featurestore.query.QueryBuilder;
import io.hops.hopsworks.common.featurestore.query.QueryController;
import io.hops.hopsworks.common.featurestore.query.QueryDTO;
import io.hops.hopsworks.common.featurestore.trainingdatasets.TrainingDatasetController;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.jobs.JobController;
import io.hops.hopsworks.common.jobs.execution.ExecutionController;
import io.hops.hopsworks.common.provenance.core.Provenance;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.DatasetException;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.dataset.DatasetAccessPermission;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featureview.FeatureView;
import io.hops.hopsworks.persistence.entity.featurestore.trainingdataset.TrainingDataset;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import org.apache.hadoop.fs.FSDataOutputStream;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class FsJobManagerController {
    @EJB
    private FeaturestoreController featurestoreController;
    @EJB
    private DistributedFsService dfs;
    @EJB
    private HdfsUsersController hdfsUsersController;
    @EJB
    private DatasetController datasetController;
    @EJB
    private JobController jobController;
    @Inject
    private ExecutionController executionController;
    @EJB
    private Settings settings;
    @EJB
    private TrainingDatasetController trainingDatasetController;
    @EJB
    private QueryController queryController;
    @EJB
    private QueryBuilder queryBuilder;
    private ObjectMapper objectMapper = new ObjectMapper();
    private SimpleDateFormat formatter = new SimpleDateFormat("ddMMyyyyHHmmss");
    private static final String INSERT_FG_OP = "insert_fg";
    private static final String TRAINING_DATASET_OP = "create_td";
    private static final String FEATURE_VIEW_TRAINING_DATASET_OP = "create_fv_td";
    private static final String COMPUTE_STATS_OP = "compute_stats";
    private static final String DELTA_STREAMER_OP = "offline_fg_backfill";

    public IngestionJob setupIngestionJob(Project project, Users user, Featuregroup featureGroup, SparkJobConfiguration sparkJobConfiguration, IngestionDataFormat dataFormat, Map<String, String> writeOptions, Map<String, String> dataOptions) throws FeaturestoreException, DatasetException, HopsSecurityException, JobException {
        DistributedFileSystemOps udfso = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, user));
        try {
            String dataPath = this.getIngestionPath(project, user, featureGroup, udfso);
            String jobConfigurationPath = this.getJobConfigurationPath(project, featureGroup.getName(), featureGroup.getVersion(), "ingestion");
            HashMap<String, Object> jobConfiguration = new HashMap<String, Object>();
            jobConfiguration.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(featureGroup.getFeaturestore().getProject()));
            jobConfiguration.put("name", featureGroup.getName());
            jobConfiguration.put("version", String.valueOf(featureGroup.getVersion()));
            jobConfiguration.put("data_path", dataPath);
            jobConfiguration.put("data_format", dataFormat.toString());
            jobConfiguration.put("data_options", dataOptions);
            jobConfiguration.put("write_options", writeOptions);
            String jobConfigurationStr = this.objectMapper.writeValueAsString(jobConfiguration);
            this.writeToHDFS(jobConfigurationPath, jobConfigurationStr, udfso);
            Jobs ingestionJob = this.configureJob(user, project, sparkJobConfiguration, this.getJobName(INSERT_FG_OP, Utils.getFeaturegroupName(featureGroup), true), this.getJobArgs(INSERT_FG_OP, jobConfigurationPath), JobType.PYSPARK);
            IngestionJob ingestionJob2 = new IngestionJob(dataPath, ingestionJob);
            return ingestionJob2;
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up feature group import job", e.getMessage(), (Throwable)e);
        }
        finally {
            this.dfs.closeDfsClient(udfso);
        }
    }

    private String getIngestionPath(Project project, Users user, Featuregroup featureGroup, DistributedFileSystemOps udfso) throws IOException, DatasetException, HopsSecurityException {
        if (!this.ingestionDatasetExists(project)) {
            this.createIngestionDataset(project, user);
        }
        String ingestionPath = Paths.get(Utils.getProjectPath(project.getName()), Settings.ServiceDataset.INGESTION.getName(), Utils.getFeaturegroupName(featureGroup) + System.currentTimeMillis()).toString();
        udfso.mkdir(ingestionPath);
        return ingestionPath;
    }

    private boolean ingestionDatasetExists(Project project) {
        return project.getDatasetCollection().stream().anyMatch(ds -> ds.getName().equalsIgnoreCase(Settings.ServiceDataset.INGESTION.getName()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createIngestionDataset(Project project, Users user) throws DatasetException, HopsSecurityException {
        DistributedFileSystemOps dfso = this.dfs.getDfsOps();
        try {
            this.datasetController.createDataset(user, project, Settings.ServiceDataset.INGESTION.getName(), Settings.ServiceDataset.INGESTION.getDescription(), Provenance.Type.DISABLED.dto, false, DatasetAccessPermission.EDITABLE, dfso);
        }
        finally {
            this.dfs.closeDfsClient(dfso);
        }
    }

    public Jobs setupStatisticsJob(Project project, Users user, Featurestore featurestore, Featuregroup featureGroup, TrainingDataset trainingDataset) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        DistributedFileSystemOps udfso = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, user));
        HashMap<String, String> jobConfiguration = new HashMap<String, String>();
        try {
            String entityName = featureGroup != null ? featureGroup.getName() : trainingDataset.getName();
            Integer entityVersion = featureGroup != null ? featureGroup.getVersion() : trainingDataset.getVersion();
            String jobConfigurationPath = this.getJobConfigurationPath(project, entityName, entityVersion, "statistics");
            jobConfiguration.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(featurestore.getProject()));
            jobConfiguration.put("type", featureGroup != null ? "fg" : "td");
            jobConfiguration.put("name", entityName);
            jobConfiguration.put("version", String.valueOf(entityVersion));
            String jobConfigurationStr = this.objectMapper.writeValueAsString(jobConfiguration);
            this.writeToHDFS(jobConfigurationPath, jobConfigurationStr, udfso);
            String jobArgs = this.getJobArgs(COMPUTE_STATS_OP, jobConfigurationPath);
            Jobs statisticsJob = this.configureJob(user, project, null, this.getJobName(COMPUTE_STATS_OP, Utils.getFeatureStoreEntityName(entityName, entityVersion), true), jobArgs, JobType.PYSPARK);
            this.executionController.start(statisticsJob, jobArgs, user);
            Jobs jobs = statisticsJob;
            return jobs;
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up statistics job", e.getMessage(), (Throwable)e);
        }
        finally {
            this.dfs.closeDfsClient(udfso);
        }
    }

    public Jobs setupTrainingDatasetJob(Project project, Users user, FeatureView featureView, Integer trainingDatasetVersion, Boolean overwrite, Map<String, String> writeOptions, SparkJobConfiguration sparkJobConfiguration) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        TrainingDataset trainingDataset = this.trainingDatasetController.getTrainingDatasetByFeatureViewAndVersion(featureView, trainingDatasetVersion);
        Query query = this.queryController.makeQuery(featureView, project, user, true, false);
        QueryDTO queryDTO = this.queryBuilder.build(query, featureView.getFeaturestore(), project, user);
        return this.setupTrainingDatasetJob(project, user, trainingDataset, queryDTO, overwrite, writeOptions, sparkJobConfiguration, FEATURE_VIEW_TRAINING_DATASET_OP);
    }

    public Jobs setupTrainingDatasetJob(Project project, Users user, TrainingDataset trainingDataset, QueryDTO queryDTO, Boolean overwrite, Map<String, String> writeOptions, SparkJobConfiguration sparkJobConfiguration) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        return this.setupTrainingDatasetJob(project, user, trainingDataset, queryDTO, overwrite, writeOptions, sparkJobConfiguration, TRAINING_DATASET_OP);
    }

    private Jobs setupTrainingDatasetJob(Project project, Users user, TrainingDataset trainingDataset, QueryDTO queryDTO, Boolean overwrite, Map<String, String> writeOptions, SparkJobConfiguration sparkJobConfiguration, String jobType) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        DistributedFileSystemOps udfso = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, user));
        try {
            String jobConfigurationPath;
            HashMap<String, Object> jobConfiguration = new HashMap<String, Object>();
            jobConfiguration.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(trainingDataset.getFeaturestore().getProject()));
            if (trainingDataset.getFeatureView() != null) {
                String featureViewName = trainingDataset.getFeatureView().getName();
                Integer featureViewVersion = trainingDataset.getFeatureView().getVersion();
                jobConfiguration.put("name", featureViewName);
                jobConfiguration.put("version", String.valueOf(featureViewVersion));
                jobConfiguration.put("td_version", String.valueOf(trainingDataset.getVersion()));
                jobConfigurationPath = this.getJobConfigurationPath(project, featureViewName + "_" + featureViewVersion, trainingDataset.getVersion(), "fv_td");
            } else {
                jobConfiguration.put("name", trainingDataset.getName());
                jobConfiguration.put("version", String.valueOf(trainingDataset.getVersion()));
                jobConfigurationPath = this.getJobConfigurationPath(project, trainingDataset.getName(), trainingDataset.getVersion(), "td");
                jobConfiguration.put("query", queryDTO);
            }
            jobConfiguration.put("write_options", writeOptions);
            jobConfiguration.put("overwrite", overwrite);
            String jobConfigurationStr = this.objectMapper.writeValueAsString(jobConfiguration);
            this.writeToHDFS(jobConfigurationPath, jobConfigurationStr, udfso);
            String jobArgs = this.getJobArgs(jobType, jobConfigurationPath);
            Jobs trainingDatasetJob = this.configureJob(user, project, sparkJobConfiguration, this.getJobName(jobType, Utils.getTrainingDatasetName(trainingDataset), true), jobArgs, JobType.PYSPARK);
            this.executionController.start(trainingDatasetJob, jobArgs, user);
            Jobs jobs = trainingDatasetJob;
            return jobs;
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up training dataset job", e.getMessage(), (Throwable)e);
        }
        finally {
            this.dfs.closeDfsClient(udfso);
        }
    }

    public Jobs setupHudiDeltaStreamerJob(Project project, Users user, Featuregroup featuregroup, DeltaStreamerJobConf deltaStreamerJobConf) throws FeaturestoreException, JobException {
        DistributedFileSystemOps udfso = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, user));
        Map<String, String> writeOptions = null;
        SparkJobConfiguration sparkJobConfiguration = null;
        if (deltaStreamerJobConf != null) {
            if (deltaStreamerJobConf.getWriteOptions() != null) {
                writeOptions = deltaStreamerJobConf.getWriteOptions().stream().collect(Collectors.toMap(OptionDTO::getName, OptionDTO::getValue));
            }
            sparkJobConfiguration = deltaStreamerJobConf.getSparkJobConfiguration();
        }
        try {
            String jobConfigurationPath = this.getJobConfigurationPath(project, featuregroup.getName(), featuregroup.getVersion(), "deltaStreamer");
            HashMap<String, Object> jobConfiguration = new HashMap<String, Object>();
            jobConfiguration.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()));
            jobConfiguration.put("name", featuregroup.getName());
            jobConfiguration.put("version", String.valueOf(featuregroup.getVersion()));
            jobConfiguration.put("write_options", writeOptions);
            String jobConfigurationStr = this.objectMapper.writeValueAsString(jobConfiguration);
            this.writeToHDFS(jobConfigurationPath, jobConfigurationStr, udfso);
            String jobArgs = this.getJobArgs(DELTA_STREAMER_OP, jobConfigurationPath);
            Jobs jobs = this.configureJob(user, project, sparkJobConfiguration, this.getJobName(DELTA_STREAMER_OP, Utils.getFeaturegroupName(featuregroup), false), jobArgs, JobType.SPARK);
            return jobs;
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up delta streamer job", e.getMessage(), (Throwable)e);
        }
        finally {
            this.dfs.closeDfsClient(udfso);
        }
    }

    public String getJobConfigurationPath(Project project, String entityName, Integer entityVersion, String prefix) {
        return Paths.get(Utils.getProjectPath(project.getName()), Settings.BaseDataset.RESOURCES.getName(), String.join((CharSequence)"_", prefix, entityName, String.valueOf(entityVersion), String.valueOf(System.currentTimeMillis()))).toString();
    }

    private String getJobName(String op, String entity, boolean withTimeStamp) {
        String name = entity + "_" + op;
        if (withTimeStamp) {
            name = name + "_" + this.formatter.format(new Date());
        }
        return name;
    }

    private String getJobArgs(String op, String jobConfigurationPath) {
        return "-op " + op + " -path " + jobConfigurationPath;
    }

    private void writeToHDFS(String filePath, String content, DistributedFileSystemOps udfso) throws IOException {
        try (FSDataOutputStream outStream = udfso.create(filePath);){
            outStream.writeBytes(content);
            outStream.hflush();
        }
    }

    private Jobs configureJob(Users user, Project project, SparkJobConfiguration sparkJobConfiguration, String jobName, String defaultArgs, JobType jobType) throws JobException {
        if (sparkJobConfiguration == null) {
            sparkJobConfiguration = new SparkJobConfiguration();
        }
        sparkJobConfiguration.setAppName(jobName);
        sparkJobConfiguration.setMainClass(jobType.equals((Object)JobType.PYSPARK) ? "org.apache.spark.deploy.PythonRunner" : "com.logicalclocks.utils.MainClass");
        sparkJobConfiguration.setAppPath(jobType.equals((Object)JobType.PYSPARK) ? this.settings.getFSPyJobUtilPath() : this.settings.getFSJavaJobUtilPath());
        sparkJobConfiguration.setDefaultArgs(defaultArgs);
        return this.jobController.putJob(user, project, null, (JobConfiguration)sparkJobConfiguration);
    }

    public void deleteDeltaStreamerJob(Project project, Users user, Featuregroup featuregroup) throws JobException {
        this.jobController.deleteJob(this.jobController.getJob(project, this.getJobName(DELTA_STREAMER_OP, Utils.getFeaturegroupName(featuregroup), false)), user);
    }
}

