/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.app;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.hops.hopsworks.common.api.ResourceRequest;
import io.hops.hopsworks.common.dao.jobs.description.JobFacade;
import io.hops.hopsworks.common.dataset.DatasetController;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.OptionDTO;
import io.hops.hopsworks.common.featurestore.app.JobEntityType;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.featuregroup.ImportFgJobConf;
import io.hops.hopsworks.common.featurestore.featuregroup.IngestionDataFormat;
import io.hops.hopsworks.common.featurestore.featuregroup.IngestionJob;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.DeltaStreamerJobConf;
import io.hops.hopsworks.common.featurestore.query.Query;
import io.hops.hopsworks.common.featurestore.query.QueryBuilder;
import io.hops.hopsworks.common.featurestore.query.QueryController;
import io.hops.hopsworks.common.featurestore.query.QueryDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
import io.hops.hopsworks.common.featurestore.trainingdatasets.TrainingDatasetController;
import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.jobs.JobController;
import io.hops.hopsworks.common.jobs.execution.ExecutionController;
import io.hops.hopsworks.common.provenance.core.Provenance;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.DatasetException;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.dataset.DatasetAccessPermission;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.FeaturegroupType;
import io.hops.hopsworks.persistence.entity.featurestore.featureview.FeatureView;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnectorType;
import io.hops.hopsworks.persistence.entity.featurestore.trainingdataset.TrainingDataset;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import org.apache.hadoop.fs.FSDataOutputStream;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class FsJobManagerController {
    @EJB
    private FeaturestoreController featurestoreController;
    @EJB
    private DistributedFsService dfs;
    @EJB
    private HdfsUsersController hdfsUsersController;
    @EJB
    private DatasetController datasetController;
    @EJB
    private JobController jobController;
    @EJB
    private JobFacade jobFacade;
    @Inject
    private ExecutionController executionController;
    @EJB
    private Settings settings;
    @EJB
    private TrainingDatasetController trainingDatasetController;
    @EJB
    private QueryController queryController;
    @EJB
    private QueryBuilder queryBuilder;
    @EJB
    private FeaturegroupFacade featuregroupFacade;
    @EJB
    private FeaturestoreUtils featurestoreUtils;
    private ObjectMapper objectMapper = new ObjectMapper();
    private SimpleDateFormat formatter = new SimpleDateFormat("ddMMyyyyHHmmss");
    private static final String INSERT_FG_OP = "insert_fg";
    private static final String COMPUTE_STATS_OP = "compute_stats";
    private static final String DELTA_STREAMER_OP = "offline_fg_materialization";
    private static final String GE_VALIDATE_OP = "ge_validate";
    private static final String IMPORT_FEATUREGROUP_OP = "import_fg";
    private static final String FEATURE_MONITORING_OP = "run_feature_monitoring";
    private static final String TRAINING_DATASET_OP = "create_td";
    private static final String FEATURE_VIEW_TRAINING_DATASET_OP = "create_fv_td";

    public IngestionJob setupIngestionJob(Project project, Users user, Featuregroup featureGroup, SparkJobConfiguration sparkJobConfiguration, IngestionDataFormat dataFormat, Map<String, String> writeOptions, Map<String, String> dataOptions) throws FeaturestoreException, DatasetException, HopsSecurityException, JobException {
        DistributedFileSystemOps udfso = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, user));
        try {
            String dataPath = this.getIngestionPath(project, user, featureGroup, udfso);
            HashMap<String, Object> jobConfiguration = new HashMap<String, Object>();
            jobConfiguration.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(featureGroup.getFeaturestore().getProject()));
            jobConfiguration.put("name", featureGroup.getName());
            jobConfiguration.put("version", String.valueOf(featureGroup.getVersion()));
            jobConfiguration.put("data_path", dataPath);
            jobConfiguration.put("data_format", dataFormat.toString());
            jobConfiguration.put("data_options", dataOptions);
            jobConfiguration.put("write_options", writeOptions);
            String jobConfigurationStr = this.objectMapper.writeValueAsString(jobConfiguration);
            String jobName = this.getJobName(INSERT_FG_OP, Utils.getFeaturegroupName(featureGroup), true);
            String jobFolder = this.createJobFolder(project, user, jobName);
            String jobConfigurationPath = this.getJobConfigurationPath(jobFolder);
            this.writeToHDFS(jobConfigurationPath, jobConfigurationStr, udfso);
            Jobs ingestionJob = this.configureJob(user, project, sparkJobConfiguration, jobName, this.getJobArgs(INSERT_FG_OP, jobConfigurationPath), JobType.PYSPARK);
            IngestionJob ingestionJob2 = new IngestionJob(dataPath, ingestionJob);
            return ingestionJob2;
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up feature group import job", e.getMessage(), (Throwable)e);
        }
        finally {
            this.dfs.closeDfsClient(udfso);
        }
    }

    public Jobs setupFeatureMonitoringJob(Users user, Project project, ResourceRequest.Name entityType, String entityName, Integer entityVersion, String configName) throws FeaturestoreException, JobException {
        DistributedFileSystemOps udfso = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, user));
        try {
            HashMap<String, String> jobConfiguration = new HashMap<String, String>();
            jobConfiguration.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(project));
            jobConfiguration.put("entity_type", entityType.toString());
            jobConfiguration.put("name", entityName);
            jobConfiguration.put("version", String.valueOf(entityVersion));
            jobConfiguration.put("config_name", configName);
            String jobName = this.getMonitoringJobName(entityName, entityVersion, configName);
            String jobFolder = this.createJobFolder(project, user, jobName);
            String jobConfigurationPath = jobFolder + "/config.json";
            this.writeToHDFS(jobConfigurationPath, this.objectMapper.writeValueAsString(jobConfiguration), udfso);
            String jobArgs = this.getJobArgs(FEATURE_MONITORING_OP, jobConfigurationPath);
            Jobs jobs = this.configureJob(user, project, null, jobName, jobArgs, JobType.PYSPARK);
            return jobs;
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up feature monitoring job", e.getMessage(), (Throwable)e);
        }
        finally {
            this.dfs.closeDfsClient(udfso);
        }
    }

    private String getIngestionPath(Project project, Users user, Featuregroup featureGroup, DistributedFileSystemOps udfso) throws IOException, DatasetException, HopsSecurityException {
        if (!this.ingestionDatasetExists(project)) {
            this.createIngestionDataset(project, user);
        }
        String ingestionPath = Paths.get(Utils.getProjectPath(project.getName()), Settings.ServiceDataset.INGESTION.getName(), Utils.getFeaturegroupName(featureGroup) + System.currentTimeMillis()).toString();
        udfso.mkdir(ingestionPath);
        return ingestionPath;
    }

    private boolean ingestionDatasetExists(Project project) {
        return project.getDatasetCollection().stream().anyMatch(ds -> ds.getName().equalsIgnoreCase(Settings.ServiceDataset.INGESTION.getName()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createIngestionDataset(Project project, Users user) throws DatasetException, HopsSecurityException {
        DistributedFileSystemOps dfso = this.dfs.getDfsOps();
        try {
            this.datasetController.createDataset(user, project, Settings.ServiceDataset.INGESTION.getName(), Settings.ServiceDataset.INGESTION.getDescription(), Provenance.Type.DISABLED.dto, false, DatasetAccessPermission.EDITABLE, dfso);
        }
        finally {
            this.dfs.closeDfsClient(dfso);
        }
    }

    private Jobs setupAndStartJob(Project project, Users user, Featurestore featurestore, String entityName, Integer entityVersion, JobEntityType type, String op, String configPrefix, boolean nameIncludeTimestamp) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        return this.setupAndStartJob(project, user, featurestore, entityName, entityVersion, null, type, op, configPrefix, nameIncludeTimestamp);
    }

    private Jobs setupAndStartJob(Project project, Users user, Featurestore featurestore, String entityName, Integer entityVersion, Integer tdVersion, JobEntityType type, String op, String configPrefix, boolean nameIncludeTimestamp) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        DistributedFileSystemOps udfso = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, user));
        HashMap<String, String> jobConfiguration = new HashMap<String, String>();
        try {
            String jobName = this.getJobName(op, Utils.getFeatureStoreEntityName(entityName, entityVersion), nameIncludeTimestamp);
            String jobFolder = this.createJobFolder(project, user, jobName);
            String jobConfigurationPath = this.getJobConfigurationPath(jobFolder);
            jobConfiguration.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(featurestore.getProject()));
            jobConfiguration.put("type", type.toString());
            jobConfiguration.put("name", entityName);
            jobConfiguration.put("version", String.valueOf(entityVersion));
            if (tdVersion != null) {
                jobConfiguration.put("td_version", String.valueOf(tdVersion));
            }
            String jobConfigurationStr = this.objectMapper.writeValueAsString(jobConfiguration);
            this.writeToHDFS(jobConfigurationPath, jobConfigurationStr, udfso);
            String jobArgs = this.getJobArgs(op, jobConfigurationPath);
            Jobs job = this.configureJob(user, project, null, jobName, jobArgs, JobType.PYSPARK);
            this.executionController.start(job, jobArgs, user);
            Jobs jobs = job;
            return jobs;
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up " + configPrefix + " job", e.getMessage(), (Throwable)e);
        }
        finally {
            this.dfs.closeDfsClient(udfso);
        }
    }

    private Jobs setupAndStartJob(Project project, Users user, Featurestore featurestore, String entityName, Integer entityVersion, JobEntityType type, String op, String configPrefix, Map<String, String> jobConfiguration) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        DistributedFileSystemOps udfso = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, user));
        try {
            String jobName = this.getJobName(op, Utils.getFeatureStoreEntityName(entityName, entityVersion), false);
            String jobConfigurationPath = this.getJobConfigurationPath(this.createJobFolder(project, user, jobName));
            jobConfiguration.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(featurestore.getProject()));
            jobConfiguration.put("type", type.toString());
            String jobConfigurationStr = this.objectMapper.writeValueAsString(jobConfiguration);
            this.writeToHDFS(jobConfigurationPath, jobConfigurationStr, udfso);
            String jobArgs = this.getJobArgs(op, jobConfigurationPath);
            Jobs job = this.configureJob(user, project, null, jobName, jobArgs, JobType.PYSPARK);
            this.executionController.start(job, jobArgs, user);
            Jobs jobs = job;
            return jobs;
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up " + configPrefix + " job", e.getMessage(), (Throwable)e);
        }
        finally {
            this.dfs.closeDfsClient(udfso);
        }
    }

    public Jobs setupStatisticsJob(Project project, Users user, Featurestore featurestore, Featuregroup featureGroup, TrainingDataset trainingDataset) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        Integer entityVersion;
        String entityName;
        JobEntityType type;
        Integer tdVersion = null;
        if (featureGroup != null) {
            type = featureGroup.getFeaturegroupType().equals((Object)FeaturegroupType.ON_DEMAND_FEATURE_GROUP) ? JobEntityType.EXTERNAL_FG : JobEntityType.FG;
            entityName = featureGroup.getName();
            entityVersion = featureGroup.getVersion();
        } else {
            entityName = trainingDataset.getName();
            entityVersion = trainingDataset.getFeatureView().getVersion();
            tdVersion = trainingDataset.getVersion();
            type = JobEntityType.TD;
        }
        return this.setupAndStartJob(project, user, featurestore, entityName, entityVersion, tdVersion, type, COMPUTE_STATS_OP, "statistics", true);
    }

    public Jobs setupValidationJob(Project project, Users user, Featurestore featurestore, Featuregroup featureGroup) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        return this.setupAndStartJob(project, user, featurestore, featureGroup.getName(), featureGroup.getVersion(), JobEntityType.FG, GE_VALIDATE_OP, "validation", false);
    }

    public Jobs setupTrainingDatasetJob(Project project, Users user, FeatureView featureView, Integer trainingDatasetVersion, Boolean overwrite, Map<String, String> writeOptions, SparkJobConfiguration sparkJobConfiguration) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        TrainingDataset trainingDataset = this.trainingDatasetController.getTrainingDatasetByFeatureViewAndVersion(featureView, trainingDatasetVersion);
        Query query = this.queryController.makeQuery(featureView, project, user, true, false, false, false, false, false);
        QueryDTO queryDTO = this.queryBuilder.build(query, featureView.getFeaturestore(), project, user);
        return this.setupTrainingDatasetJob(project, user, trainingDataset, queryDTO, overwrite, writeOptions, sparkJobConfiguration, FEATURE_VIEW_TRAINING_DATASET_OP);
    }

    public Jobs setupTrainingDatasetJob(Project project, Users user, TrainingDataset trainingDataset, QueryDTO queryDTO, Boolean overwrite, Map<String, String> writeOptions, SparkJobConfiguration sparkJobConfiguration) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        return this.setupTrainingDatasetJob(project, user, trainingDataset, queryDTO, overwrite, writeOptions, sparkJobConfiguration, TRAINING_DATASET_OP);
    }

    private Jobs setupTrainingDatasetJob(Project project, Users user, TrainingDataset trainingDataset, QueryDTO queryDTO, Boolean overwrite, Map<String, String> writeOptions, SparkJobConfiguration sparkJobConfiguration, String jobType) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        this.featurestoreUtils.verifyTrainingDatasetDataOwnerOrSelf(user, project, trainingDataset, FeaturestoreUtils.ActionMessage.SETUP_TRAINING_DATASET_JOB);
        DistributedFileSystemOps udfso = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, user));
        try {
            String jobConfigurationPath;
            HashMap<String, Object> jobConfiguration = new HashMap<String, Object>();
            String jobName = null;
            jobConfiguration.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(trainingDataset.getFeaturestore().getProject()));
            if (trainingDataset.getFeatureView() != null) {
                String featureViewName = trainingDataset.getFeatureView().getName();
                Integer featureViewVersion = trainingDataset.getFeatureView().getVersion();
                jobConfiguration.put("name", featureViewName);
                jobConfiguration.put("version", String.valueOf(featureViewVersion));
                jobConfiguration.put("td_version", String.valueOf(trainingDataset.getVersion()));
                jobName = this.getJobName(jobType, Utils.getFeatureStoreEntityName(featureViewName, featureViewVersion), true);
                String jobFolder = this.createJobFolder(project, user, jobName);
                jobConfigurationPath = this.getJobConfigurationPath(jobFolder);
            } else {
                jobConfiguration.put("name", trainingDataset.getName());
                jobConfiguration.put("version", String.valueOf(trainingDataset.getVersion()));
                jobName = this.getJobName(jobType, Utils.getFeatureStoreEntityName(trainingDataset.getName(), trainingDataset.getVersion()), true);
                String jobFolder = this.createJobFolder(project, user, jobName);
                jobConfigurationPath = this.getJobConfigurationPath(jobFolder);
                jobConfiguration.put("query", queryDTO);
            }
            jobConfiguration.put("write_options", writeOptions);
            jobConfiguration.put("overwrite", overwrite);
            String jobConfigurationStr = this.objectMapper.writeValueAsString(jobConfiguration);
            this.writeToHDFS(jobConfigurationPath, jobConfigurationStr, udfso);
            String jobArgs = this.getJobArgs(jobType, jobConfigurationPath);
            Jobs trainingDatasetJob = this.configureJob(user, project, sparkJobConfiguration, jobName, jobArgs, JobType.PYSPARK);
            this.executionController.start(trainingDatasetJob, jobArgs, user);
            Jobs jobs = trainingDatasetJob;
            return jobs;
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up training dataset job", e.getMessage(), (Throwable)e);
        }
        finally {
            this.dfs.closeDfsClient(udfso);
        }
    }

    public Jobs setupHudiDeltaStreamerJob(Project project, Users user, Featuregroup featuregroup, DeltaStreamerJobConf deltaStreamerJobConf) throws FeaturestoreException, JobException {
        DistributedFileSystemOps udfso = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, user));
        Map<String, String> writeOptions = null;
        SparkJobConfiguration sparkJobConfiguration = null;
        if (deltaStreamerJobConf != null) {
            if (deltaStreamerJobConf.getWriteOptions() != null) {
                writeOptions = deltaStreamerJobConf.getWriteOptions().stream().collect(Collectors.toMap(OptionDTO::getName, OptionDTO::getValue));
            }
            sparkJobConfiguration = deltaStreamerJobConf.getSparkJobConfiguration();
        }
        try {
            String jobName = this.getJobName(DELTA_STREAMER_OP, Utils.getFeaturegroupName(featuregroup), false);
            String jobFolder = this.createJobFolder(project, user, jobName);
            String jobConfigurationPath = this.getJobConfigurationPath(jobFolder);
            HashMap<String, Object> jobConfiguration = new HashMap<String, Object>();
            jobConfiguration.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()));
            jobConfiguration.put("name", featuregroup.getName());
            jobConfiguration.put("version", String.valueOf(featuregroup.getVersion()));
            jobConfiguration.put("write_options", writeOptions);
            String jobConfigurationStr = this.objectMapper.writeValueAsString(jobConfiguration);
            this.writeToHDFS(jobConfigurationPath, jobConfigurationStr, udfso);
            String jobArgs = this.getJobArgs(DELTA_STREAMER_OP, jobConfigurationPath);
            Jobs jobs = this.configureJob(user, project, sparkJobConfiguration, jobName, jobArgs, JobType.SPARK);
            return jobs;
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up delta streamer job", e.getMessage(), (Throwable)e);
        }
        finally {
            this.dfs.closeDfsClient(udfso);
        }
    }

    private String createJobFolder(Project project, Users user, String jobName) throws JobException {
        return this.jobController.createJobFolder(project, user, jobName);
    }

    private String getJobConfigurationPath(String jobFolder) {
        return jobFolder + "/config_" + System.currentTimeMillis();
    }

    private String getJobName(String op, String entityName, boolean withTimeStamp) {
        String name = entityName + "_" + op;
        if (withTimeStamp) {
            name = name + "_" + this.formatter.format(new Date());
        }
        return name;
    }

    public String getMonitoringJobName(String entityName, Integer entityVersion, String configName) {
        return this.getJobName(FEATURE_MONITORING_OP, Utils.getFeatureStoreEntityName(entityName, entityVersion) + "_" + configName, false);
    }

    private String getJobArgs(String op, String jobConfigurationPath) {
        return "-op " + op + " -path " + jobConfigurationPath;
    }

    private void writeToHDFS(String filePath, String content, DistributedFileSystemOps udfso) throws IOException {
        try (FSDataOutputStream outStream = udfso.create(filePath);){
            outStream.writeBytes(content);
            outStream.hflush();
        }
    }

    private Jobs configureJob(Users user, Project project, SparkJobConfiguration sparkJobConfiguration, String jobName, String defaultArgs, JobType jobType) throws JobException {
        Jobs job = this.jobFacade.findByProjectAndName(project, jobName);
        if (sparkJobConfiguration == null) {
            sparkJobConfiguration = new SparkJobConfiguration();
        }
        sparkJobConfiguration.setAppName(jobName);
        sparkJobConfiguration.setMainClass(jobType.equals((Object)JobType.PYSPARK) ? "org.apache.spark.deploy.PythonRunner" : "com.logicalclocks.utils.MainClass");
        sparkJobConfiguration.setAppPath(jobType.equals((Object)JobType.PYSPARK) ? this.settings.getFSPyJobUtilPath() : this.settings.getFSJavaJobUtilPath());
        sparkJobConfiguration.setDefaultArgs(defaultArgs);
        return this.jobController.putJob(user, project, job, (JobConfiguration)sparkJobConfiguration);
    }

    public void deleteJobs(Project project, Users user, Featuregroup featuregroup) throws JobException {
        String jobNameRegex = String.format("%s_(%s|%s|%s|%s|%s|%s).*", Utils.getFeaturegroupName(featuregroup), INSERT_FG_OP, COMPUTE_STATS_OP, DELTA_STREAMER_OP, GE_VALIDATE_OP, IMPORT_FEATUREGROUP_OP, FEATURE_MONITORING_OP);
        this.deleteJobs(project, user, jobNameRegex);
    }

    public void deleteJobs(Project project, Users user, TrainingDataset trainingDataset) throws JobException {
        String jobNameRegex = String.format("%s_(%s).*", Utils.getTrainingDatasetName(trainingDataset), TRAINING_DATASET_OP);
        this.deleteJobs(project, user, jobNameRegex);
    }

    public void deleteJobs(Project project, Users user, FeatureView featureView) throws JobException {
        String jobNameRegex = String.format("%s_(%s|%s).*", Utils.getFeatureViewName(featureView), FEATURE_VIEW_TRAINING_DATASET_OP, FEATURE_MONITORING_OP);
        this.deleteJobs(project, user, jobNameRegex);
    }

    private void deleteJobs(Project project, Users user, String jobPrefix) throws JobException {
        List<Jobs> jobsList = this.jobController.getJobsWithJobNameRegex(project, jobPrefix);
        for (Jobs job : jobsList) {
            this.jobController.deleteJob(job, user);
        }
    }

    public Jobs setupImportFgJob(Project project, Users user, Featurestore featurestore, ImportFgJobConf importFgJobConf) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        HashMap<String, String> jobConfiguration = new HashMap<String, String>();
        FeaturestoreStorageConnectorDTO storageConnector = importFgJobConf.getStorageConnectorDTO();
        FeaturestoreConnectorType connectorType = storageConnector.getStorageConnectorType();
        HashMap<String, String> options = new HashMap<String, String>();
        if (importFgJobConf.getTable() != null) {
            switch (connectorType) {
                case SNOWFLAKE: 
                case REDSHIFT: {
                    options.put("dbtable", importFgJobConf.getTable());
                    break;
                }
                case BIGQUERY: {
                    options.put("table", importFgJobConf.getTable());
                    break;
                }
                default: {
                    throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.WARNING, "Storage connector type " + connectorType.name() + " not supported for importing data");
                }
            }
        }
        int version = this.getNewFeatureGroupVersion(featurestore, importFgJobConf.getFeatureGroupName());
        jobConfiguration.put("storageConnectorName", storageConnector.getName());
        jobConfiguration.put("featureGroupName", importFgJobConf.getFeatureGroupName());
        jobConfiguration.put("query", importFgJobConf.getQuery());
        jobConfiguration.put("primaryKey", (String)((Object)importFgJobConf.getPrimaryKey()));
        jobConfiguration.put("version", String.valueOf(version));
        jobConfiguration.put("options", (String)((Object)options));
        jobConfiguration.put("connectorType", connectorType.name());
        if (importFgJobConf.getPartitionKey() != null) {
            jobConfiguration.put("partitionKey", (String)((Object)importFgJobConf.getPartitionKey()));
        }
        if (importFgJobConf.getStatisticsConfigDTO() != null) {
            jobConfiguration.put("statisticsConfig", (String)((Object)importFgJobConf.getStatisticsConfigDTO()));
        }
        if (importFgJobConf.isOnlineEnabled()) {
            jobConfiguration.put("onlineEnabled", (String)((Object)Boolean.valueOf(importFgJobConf.isOnlineEnabled())));
        }
        if (importFgJobConf.getEventTime() != null) {
            jobConfiguration.put("eventTime", importFgJobConf.getEventTime());
        }
        if (importFgJobConf.getDescription() != null) {
            jobConfiguration.put("description", importFgJobConf.getDescription());
        }
        return this.setupAndStartJob(project, user, featurestore, importFgJobConf.getFeatureGroupName(), (Integer)version, JobEntityType.FG, IMPORT_FEATUREGROUP_OP, "importData", jobConfiguration);
    }

    public int getNewFeatureGroupVersion(Featurestore featurestore, String featureGroupName) {
        List<Featuregroup> fgPrevious = this.featuregroupFacade.findByNameAndFeaturestoreOrderedDescVersion(featureGroupName, featurestore);
        if (fgPrevious != null && !fgPrevious.isEmpty()) {
            return fgPrevious.get(0).getVersion() + 1;
        }
        return 1;
    }
}

