package io.hops.hopsworks.common.featurestore.app;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.hops.hopsworks.common.dataset.DatasetController;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.featuregroup.IngestionDataFormat;
import io.hops.hopsworks.common.featurestore.featuregroup.IngestionJob;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.DeltaStreamerJobConf;
import io.hops.hopsworks.common.featurestore.query.QueryBuilder;
import io.hops.hopsworks.common.featurestore.query.QueryController;
import io.hops.hopsworks.common.featurestore.query.QueryDTO;
import io.hops.hopsworks.common.featurestore.trainingdatasets.TrainingDatasetController;
import io.hops.hopsworks.common.featurestore.xattr.dto.FeaturestoreXAttrsConstants;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.jobs.JobController;
import io.hops.hopsworks.common.jobs.execution.ExecutionController;
import io.hops.hopsworks.common.provenance.core.Provenance;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.DatasetException;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.dataset.DatasetAccessPermission;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featureview.FeatureView;
import io.hops.hopsworks.persistence.entity.featurestore.trainingdataset.TrainingDataset;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import org.apache.hadoop.fs.FSDataOutputStream;

@TransactionAttribute(TransactionAttributeType.NEVER)
@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/featurestore/app/FsJobManagerController.class */
public class FsJobManagerController {

    @EJB
    private FeaturestoreController featurestoreController;

    @EJB
    private DistributedFsService dfs;

    @EJB
    private HdfsUsersController hdfsUsersController;

    @EJB
    private DatasetController datasetController;

    @EJB
    private JobController jobController;

    @Inject
    private ExecutionController executionController;

    @EJB
    private Settings settings;

    @EJB
    private TrainingDatasetController trainingDatasetController;

    @EJB
    private QueryController queryController;

    @EJB
    private QueryBuilder queryBuilder;
    private ObjectMapper objectMapper = new ObjectMapper();
    private SimpleDateFormat formatter = new SimpleDateFormat("ddMMyyyyHHmmss");
    private static final String INSERT_FG_OP = "insert_fg";
    private static final String TRAINING_DATASET_OP = "create_td";
    private static final String FEATURE_VIEW_TRAINING_DATASET_OP = "create_fv_td";
    private static final String COMPUTE_STATS_OP = "compute_stats";
    private static final String DELTA_STREAMER_OP = "offline_fg_backfill";

    public IngestionJob setupIngestionJob(Project project, Users users, Featuregroup featuregroup, SparkJobConfiguration sparkJobConfiguration, IngestionDataFormat ingestionDataFormat, Map<String, String> map, Map<String, String> map2) throws FeaturestoreException, DatasetException, HopsSecurityException, JobException {
        DistributedFileSystemOps dfsOps = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, users));
        try {
            try {
                String ingestionPath = getIngestionPath(project, users, featuregroup, dfsOps);
                String jobConfigurationPath = getJobConfigurationPath(project, featuregroup.getName(), featuregroup.getVersion(), "ingestion");
                HashMap hashMap = new HashMap();
                hashMap.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()));
                hashMap.put("name", featuregroup.getName());
                hashMap.put(FeaturestoreXAttrsConstants.VERSION, String.valueOf(featuregroup.getVersion()));
                hashMap.put("data_path", ingestionPath);
                hashMap.put("data_format", ingestionDataFormat.toString());
                hashMap.put("data_options", map2);
                hashMap.put("write_options", map);
                writeToHDFS(jobConfigurationPath, this.objectMapper.writeValueAsString(hashMap), dfsOps);
                IngestionJob ingestionJob = new IngestionJob(ingestionPath, configureJob(users, project, sparkJobConfiguration, getJobName(INSERT_FG_OP, Utils.getFeaturegroupName(featuregroup), true), getJobArgs(INSERT_FG_OP, jobConfigurationPath), JobType.PYSPARK));
                this.dfs.closeDfsClient(dfsOps);
                return ingestionJob;
            } catch (IOException e) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up feature group import job", e.getMessage(), e);
            }
        } catch (Throwable th) {
            this.dfs.closeDfsClient(dfsOps);
            throw th;
        }
    }

    private String getIngestionPath(Project project, Users users, Featuregroup featuregroup, DistributedFileSystemOps distributedFileSystemOps) throws IOException, DatasetException, HopsSecurityException {
        if (!ingestionDatasetExists(project)) {
            createIngestionDataset(project, users);
        }
        String path = Paths.get(Utils.getProjectPath(project.getName()), Settings.ServiceDataset.INGESTION.getName(), Utils.getFeaturegroupName(featuregroup) + System.currentTimeMillis()).toString();
        distributedFileSystemOps.mkdir(path);
        return path;
    }

    private boolean ingestionDatasetExists(Project project) {
        return project.getDatasetCollection().stream().anyMatch(dataset -> {
            return dataset.getName().equalsIgnoreCase(Settings.ServiceDataset.INGESTION.getName());
        });
    }

    private void createIngestionDataset(Project project, Users users) throws DatasetException, HopsSecurityException {
        DistributedFileSystemOps dfsOps = this.dfs.getDfsOps();
        try {
            this.datasetController.createDataset(users, project, Settings.ServiceDataset.INGESTION.getName(), Settings.ServiceDataset.INGESTION.getDescription(), Provenance.Type.DISABLED.dto, false, DatasetAccessPermission.EDITABLE, dfsOps);
            this.dfs.closeDfsClient(dfsOps);
        } catch (Throwable th) {
            this.dfs.closeDfsClient(dfsOps);
            throw th;
        }
    }

    public Jobs setupStatisticsJob(Project project, Users users, Featurestore featurestore, Featuregroup featuregroup, TrainingDataset trainingDataset) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        DistributedFileSystemOps dfsOps = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, users));
        HashMap hashMap = new HashMap();
        try {
            try {
                String name = featuregroup != null ? featuregroup.getName() : trainingDataset.getName();
                Integer version = featuregroup != null ? featuregroup.getVersion() : trainingDataset.getVersion();
                String jobConfigurationPath = getJobConfigurationPath(project, name, version, "statistics");
                hashMap.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(featurestore.getProject()));
                hashMap.put("type", featuregroup != null ? "fg" : "td");
                hashMap.put("name", name);
                hashMap.put(FeaturestoreXAttrsConstants.VERSION, String.valueOf(version));
                writeToHDFS(jobConfigurationPath, this.objectMapper.writeValueAsString(hashMap), dfsOps);
                String jobArgs = getJobArgs(COMPUTE_STATS_OP, jobConfigurationPath);
                Jobs configureJob = configureJob(users, project, null, getJobName(COMPUTE_STATS_OP, Utils.getFeatureStoreEntityName(name, version), true), jobArgs, JobType.PYSPARK);
                this.executionController.start(configureJob, jobArgs, users);
                this.dfs.closeDfsClient(dfsOps);
                return configureJob;
            } catch (IOException e) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up statistics job", e.getMessage(), e);
            }
        } catch (Throwable th) {
            this.dfs.closeDfsClient(dfsOps);
            throw th;
        }
    }

    public Jobs setupTrainingDatasetJob(Project project, Users users, FeatureView featureView, Integer num, Boolean bool, Map<String, String> map, SparkJobConfiguration sparkJobConfiguration) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        return setupTrainingDatasetJob(project, users, this.trainingDatasetController.getTrainingDatasetByFeatureViewAndVersion(featureView, num), this.queryBuilder.build(this.queryController.makeQuery(featureView, project, users, true, false), featureView.getFeaturestore(), project, users), bool, map, sparkJobConfiguration, FEATURE_VIEW_TRAINING_DATASET_OP);
    }

    public Jobs setupTrainingDatasetJob(Project project, Users users, TrainingDataset trainingDataset, QueryDTO queryDTO, Boolean bool, Map<String, String> map, SparkJobConfiguration sparkJobConfiguration) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        return setupTrainingDatasetJob(project, users, trainingDataset, queryDTO, bool, map, sparkJobConfiguration, TRAINING_DATASET_OP);
    }

    private Jobs setupTrainingDatasetJob(Project project, Users users, TrainingDataset trainingDataset, QueryDTO queryDTO, Boolean bool, Map<String, String> map, SparkJobConfiguration sparkJobConfiguration, String str) throws FeaturestoreException, JobException, GenericException, ProjectException, ServiceException {
        String jobConfigurationPath;
        DistributedFileSystemOps dfsOps = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, users));
        try {
            try {
                HashMap hashMap = new HashMap();
                hashMap.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(trainingDataset.getFeaturestore().getProject()));
                if (trainingDataset.getFeatureView() != null) {
                    String name = trainingDataset.getFeatureView().getName();
                    Integer version = trainingDataset.getFeatureView().getVersion();
                    hashMap.put("name", name);
                    hashMap.put(FeaturestoreXAttrsConstants.VERSION, String.valueOf(version));
                    hashMap.put("td_version", String.valueOf(trainingDataset.getVersion()));
                    jobConfigurationPath = getJobConfigurationPath(project, name + "_" + version, trainingDataset.getVersion(), "fv_td");
                } else {
                    hashMap.put("name", trainingDataset.getName());
                    hashMap.put(FeaturestoreXAttrsConstants.VERSION, String.valueOf(trainingDataset.getVersion()));
                    jobConfigurationPath = getJobConfigurationPath(project, trainingDataset.getName(), trainingDataset.getVersion(), "td");
                    hashMap.put("query", queryDTO);
                }
                hashMap.put("write_options", map);
                hashMap.put("overwrite", bool);
                writeToHDFS(jobConfigurationPath, this.objectMapper.writeValueAsString(hashMap), dfsOps);
                String jobArgs = getJobArgs(str, jobConfigurationPath);
                Jobs configureJob = configureJob(users, project, sparkJobConfiguration, getJobName(str, Utils.getTrainingDatasetName(trainingDataset), true), jobArgs, JobType.PYSPARK);
                this.executionController.start(configureJob, jobArgs, users);
                this.dfs.closeDfsClient(dfsOps);
                return configureJob;
            } catch (IOException e) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up training dataset job", e.getMessage(), e);
            }
        } catch (Throwable th) {
            this.dfs.closeDfsClient(dfsOps);
            throw th;
        }
    }

    public Jobs setupHudiDeltaStreamerJob(Project project, Users users, Featuregroup featuregroup, DeltaStreamerJobConf deltaStreamerJobConf) throws FeaturestoreException, JobException {
        DistributedFileSystemOps dfsOps = this.dfs.getDfsOps(this.hdfsUsersController.getHdfsUserName(project, users));
        Map map = null;
        SparkJobConfiguration sparkJobConfiguration = null;
        if (deltaStreamerJobConf != null) {
            if (deltaStreamerJobConf.getWriteOptions() != null) {
                map = (Map) deltaStreamerJobConf.getWriteOptions().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getName();
                }, (v0) -> {
                    return v0.getValue();
                }));
            }
            sparkJobConfiguration = deltaStreamerJobConf.getSparkJobConfiguration();
        }
        try {
            try {
                String jobConfigurationPath = getJobConfigurationPath(project, featuregroup.getName(), featuregroup.getVersion(), "deltaStreamer");
                HashMap hashMap = new HashMap();
                hashMap.put("feature_store", this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()));
                hashMap.put("name", featuregroup.getName());
                hashMap.put(FeaturestoreXAttrsConstants.VERSION, String.valueOf(featuregroup.getVersion()));
                hashMap.put("write_options", map);
                writeToHDFS(jobConfigurationPath, this.objectMapper.writeValueAsString(hashMap), dfsOps);
                Jobs configureJob = configureJob(users, project, sparkJobConfiguration, getJobName(DELTA_STREAMER_OP, Utils.getFeaturegroupName(featuregroup), false), getJobArgs(DELTA_STREAMER_OP, jobConfigurationPath), JobType.SPARK);
                this.dfs.closeDfsClient(dfsOps);
                return configureJob;
            } catch (IOException e) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_JOB_SETUP, Level.SEVERE, "Error setting up delta streamer job", e.getMessage(), e);
            }
        } catch (Throwable th) {
            this.dfs.closeDfsClient(dfsOps);
            throw th;
        }
    }

    public String getJobConfigurationPath(Project project, String str, Integer num, String str2) {
        return Paths.get(Utils.getProjectPath(project.getName()), Settings.BaseDataset.RESOURCES.getName(), String.join("_", str2, str, String.valueOf(num), String.valueOf(System.currentTimeMillis()))).toString();
    }

    private String getJobName(String str, String str2, boolean z) {
        String str3 = str2 + "_" + str;
        if (z) {
            str3 = str3 + "_" + this.formatter.format(new Date());
        }
        return str3;
    }

    private String getJobArgs(String str, String str2) {
        return "-op " + str + " -path " + str2;
    }

    private void writeToHDFS(String str, String str2, DistributedFileSystemOps distributedFileSystemOps) throws IOException {
        FSDataOutputStream create = distributedFileSystemOps.create(str);
        Throwable th = null;
        try {
            try {
                create.writeBytes(str2);
                create.hflush();
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    private Jobs configureJob(Users users, Project project, SparkJobConfiguration sparkJobConfiguration, String str, String str2, JobType jobType) throws JobException {
        if (sparkJobConfiguration == null) {
            sparkJobConfiguration = new SparkJobConfiguration();
        }
        sparkJobConfiguration.setAppName(str);
        sparkJobConfiguration.setMainClass(jobType.equals(JobType.PYSPARK) ? Settings.SPARK_PY_MAINCLASS : Settings.HSFS_UTIL_MAIN_CLASS);
        sparkJobConfiguration.setAppPath(jobType.equals(JobType.PYSPARK) ? this.settings.getFSPyJobUtilPath() : this.settings.getFSJavaJobUtilPath());
        sparkJobConfiguration.setDefaultArgs(str2);
        return this.jobController.putJob(users, project, null, sparkJobConfiguration);
    }

    public void deleteDeltaStreamerJob(Project project, Users users, Featuregroup featuregroup) throws JobException {
        this.jobController.deleteJob(this.jobController.getJob(project, getJobName(DELTA_STREAMER_OP, Utils.getFeaturegroupName(featuregroup), false)), users);
    }
}
