/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.serving;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.dao.serving.ServingFacade;
import io.hops.hopsworks.common.integrations.LocalhostStereotype;
import io.hops.hopsworks.common.serving.ServingController;
import io.hops.hopsworks.common.serving.ServingLogs;
import io.hops.hopsworks.common.serving.ServingStatusCondition;
import io.hops.hopsworks.common.serving.ServingStatusEnum;
import io.hops.hopsworks.common.serving.ServingWrapper;
import io.hops.hopsworks.common.serving.inference.LocalhostSkLearnInferenceUtils;
import io.hops.hopsworks.common.serving.inference.LocalhostTfInferenceUtils;
import io.hops.hopsworks.common.serving.sklearn.LocalhostSkLearnServingController;
import io.hops.hopsworks.common.serving.tf.LocalhostTfServingController;
import io.hops.hopsworks.common.serving.util.KafkaServingHelper;
import io.hops.hopsworks.common.serving.util.ServingCommands;
import io.hops.hopsworks.common.serving.util.ServingUtils;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServingException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.serving.ModelServer;
import io.hops.hopsworks.persistence.entity.serving.Serving;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@LocalhostStereotype
@Stateless
@TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
public class LocalhostServingController
implements ServingController {
    public static final String CID_FAILED = "failed";
    public static final String CID_STOPPED = "stopped";
    public static final String SERVING_DIRS = "/serving/";
    @EJB
    private ServingFacade servingFacade;
    @EJB
    private KafkaServingHelper kafkaServingHelper;
    @EJB
    private LocalhostSkLearnServingController skLearnServingController;
    @EJB
    private LocalhostTfServingController tfServingController;
    @EJB
    private ServingUtils servingUtils;
    @EJB
    private LocalhostTfInferenceUtils localhostTfInferenceUtils;
    @EJB
    private LocalhostSkLearnInferenceUtils localhostSkLearnInferenceUtils;

    @Override
    public List<ServingWrapper> getAll(Project project, String modelNameFilter, Integer modelVersionFilter, ServingStatusEnum statusFilter) throws ServingException {
        List<Serving> servingList = Strings.isNullOrEmpty((String)modelNameFilter) ? this.servingFacade.findForProject(project) : (modelVersionFilter == null ? this.servingFacade.findForProjectAndModel(project, modelNameFilter) : this.servingFacade.findForProjectAndModelVersion(project, modelNameFilter, modelVersionFilter));
        ArrayList<ServingWrapper> servingWrapperList = new ArrayList<ServingWrapper>();
        for (Serving serving : servingList) {
            ServingWrapper servingWrapper = this.getServingInternal(serving);
            if (statusFilter != null && servingWrapper.getStatus() != statusFilter) continue;
            servingWrapperList.add(servingWrapper);
        }
        return servingWrapperList;
    }

    @Override
    public ServingWrapper get(Project project, Integer id) throws ServingException {
        Serving serving = this.servingFacade.findByProjectAndId(project, id);
        if (serving == null) {
            return null;
        }
        return this.getServingInternal(serving);
    }

    @Override
    public ServingWrapper get(Project project, String name) throws ServingException {
        Serving serving = this.servingFacade.findByProjectAndName(project, name);
        if (serving == null) {
            return null;
        }
        return this.getServingInternal(serving);
    }

    @Override
    public void deleteAll(Project project) throws ServingException {
        for (Serving serving : this.servingFacade.findForProject(project)) {
            this.delete(project, serving.getId());
        }
    }

    @Override
    public void delete(Project project, Integer id) throws ServingException {
        Serving serving = this.servingFacade.acquireLock(project, id);
        ServingStatusEnum status = LocalhostServingController.getServingStatus(serving);
        if (!status.equals((Object)ServingStatusEnum.STARTING)) {
            this.killServingInstance(project, serving, false);
        }
        this.servingFacade.delete(serving);
    }

    @Override
    public String getClassName() {
        return LocalhostServingController.class.getName();
    }

    private ServingWrapper getServingInternal(Serving serving) {
        String path;
        ServingWrapper servingWrapper = new ServingWrapper(serving);
        ServingStatusEnum status = LocalhostServingController.getServingStatus(serving);
        servingWrapper.setStatus(status);
        switch (status) {
            case CREATED: 
            case STOPPED: {
                servingWrapper.setCondition(ServingStatusCondition.getStoppedSuccessCondition());
                break;
            }
            case STOPPING: {
                servingWrapper.setCondition(ServingStatusCondition.getStoppedInProgressCondition());
                break;
            }
            case FAILED: {
                servingWrapper.setCondition(ServingStatusCondition.getStartedFailedCondition("deployment terminated unsuccessfully"));
                break;
            }
            case UPDATING: 
            case STARTING: {
                servingWrapper.setCondition(ServingStatusCondition.getStartedInProgressCondition());
                break;
            }
            case IDLE: 
            case RUNNING: {
                servingWrapper.setCondition(ServingStatusCondition.getReadySuccessCondition());
            }
        }
        if (status == ServingStatusEnum.RUNNING) {
            servingWrapper.setAvailableReplicas(1);
            servingWrapper.setInternalPort(serving.getLocalPort());
        } else {
            servingWrapper.setAvailableReplicas(0);
            if (status == ServingStatusEnum.IDLE) {
                servingWrapper.setInternalPort(serving.getLocalPort());
            } else {
                servingWrapper.setInternalPort(null);
            }
        }
        if (serving.getModelServer() == ModelServer.TENSORFLOW_SERVING) {
            path = this.localhostTfInferenceUtils.getPath(serving.getName(), serving.getModelVersion(), null);
        } else if (serving.getModelServer() == ModelServer.PYTHON) {
            path = this.localhostSkLearnInferenceUtils.getPath(null);
        } else {
            throw new UnsupportedOperationException("Model server not supported as local serving");
        }
        servingWrapper.setModelServerInferencePath(path);
        servingWrapper.setHopsworksInferencePath("/project/" + serving.getProject().getId() + "/inference/models/" + serving.getName());
        servingWrapper.setKafkaTopicDTO(this.kafkaServingHelper.buildTopicDTO(serving));
        return servingWrapper;
    }

    @Override
    public void startOrStop(Project project, Users user, Integer servingId, ServingCommands command) throws ServingException {
        Serving serving = this.servingFacade.acquireLock(project, servingId);
        ServingStatusEnum status = LocalhostServingController.getServingStatus(serving);
        if (command == ServingCommands.START && status == ServingStatusEnum.STARTING) {
            this.startServingInstance(project, user, serving);
        } else if (command == ServingCommands.STOP && (status == ServingStatusEnum.UPDATING || status == ServingStatusEnum.FAILED)) {
            this.killServingInstance(project, serving, true);
        } else {
            this.servingFacade.releaseLock(project, servingId);
            String userMsg = "Instance is already " + (command == ServingCommands.START ? ServingStatusEnum.STARTING.toString() : ServingStatusEnum.STOPPED.toString()).toLowerCase();
            throw new ServingException(RESTCodes.ServingErrorCode.LIFECYCLE_ERROR, Level.FINE, userMsg);
        }
    }

    @Override
    public void put(Project project, Users user, ServingWrapper servingWrapper) throws ProjectException, ServingException, KafkaException, UserException, InterruptedException, ExecutionException {
        Serving serving = servingWrapper.getServing();
        if (serving.getId() == null) {
            serving.setCreated(new Date());
            serving.setCreator(user);
            serving.setProject(project);
            UUID uuid = UUID.randomUUID();
            serving.setLocalDir(uuid.toString());
            serving.setCid(CID_STOPPED);
            serving.setInstances(Integer.valueOf(1));
            this.kafkaServingHelper.setupKafkaServingTopic(project, servingWrapper, serving, null);
            Serving newServing = this.servingFacade.merge(serving);
            servingWrapper.setServing(newServing);
            servingWrapper.setKafkaTopicDTO(this.kafkaServingHelper.buildTopicDTO(newServing));
        } else {
            Serving oldDbServing = this.servingFacade.acquireLock(project, serving.getId());
            ServingStatusEnum status = LocalhostServingController.getServingStatus(oldDbServing);
            this.kafkaServingHelper.setupKafkaServingTopic(project, servingWrapper, serving, oldDbServing);
            Serving dbServing = this.servingFacade.updateDbObject(serving, project);
            if (status == ServingStatusEnum.RUNNING || status == ServingStatusEnum.UPDATING || status == ServingStatusEnum.IDLE) {
                String newRevision = this.servingUtils.getNewRevisionID();
                serving.setRevision(newRevision);
                Boolean samePredictor = oldDbServing.getPredictor() == null && dbServing.getPredictor() == null || oldDbServing.getPredictor() != null && dbServing.getPredictor() != null && oldDbServing.getPredictor().equals(dbServing.getPredictor());
                if (!(oldDbServing.getName().equals(dbServing.getName()) && oldDbServing.getModelPath().equals(dbServing.getModelPath()) && samePredictor.booleanValue() && oldDbServing.getBatchingConfiguration() == dbServing.getBatchingConfiguration() && oldDbServing.getModelVersion() <= dbServing.getModelVersion())) {
                    this.restartServingInstance(project, user, oldDbServing, dbServing);
                } else if (serving.getModelServer() == ModelServer.TENSORFLOW_SERVING) {
                    this.tfServingController.updateModelVersion(project, user, dbServing);
                } else {
                    this.servingFacade.releaseLock(project, serving.getId());
                }
            } else {
                this.servingFacade.releaseLock(project, serving.getId());
            }
            servingWrapper.setServing(dbServing);
            servingWrapper.setKafkaTopicDTO(this.kafkaServingHelper.buildTopicDTO(dbServing));
        }
    }

    @Override
    public List<ServingLogs> getLogs(Project project, Integer servingId, String component, Integer tailingLines) throws ServingException {
        throw new ServingException(RESTCodes.ServingErrorCode.KUBERNETES_NOT_INSTALLED, Level.FINE, "Direct access to server logs is only supported in Kubernetes deployments");
    }

    private void startServingInstance(Project project, Users user, Serving serving) throws ServingException {
        String newRevision = this.servingUtils.getNewRevisionID();
        serving.setRevision(newRevision);
        if (serving.getModelServer() == ModelServer.TENSORFLOW_SERVING) {
            this.tfServingController.startServingInstance(project, user, serving);
        } else if (serving.getModelServer() == ModelServer.PYTHON) {
            this.skLearnServingController.startServingInstance(project, user, serving);
        }
    }

    private void killServingInstance(Project project, Serving serving, boolean releaseLock) throws ServingException {
        if (serving.getModelServer() == ModelServer.TENSORFLOW_SERVING) {
            this.tfServingController.killServingInstance(project, serving, releaseLock);
        } else if (serving.getModelServer() == ModelServer.PYTHON) {
            this.skLearnServingController.killServingInstance(project, serving, releaseLock);
        }
    }

    private void restartServingInstance(Project project, Users user, Serving currentInstance, Serving newInstance) throws ServingException {
        this.killServingInstance(project, currentInstance, false);
        this.startServingInstance(project, user, newInstance);
    }

    public static ServingStatusEnum getServingStatus(Serving serving) {
        if (serving.getCid().equals(CID_STOPPED)) {
            if (serving.getLockIP() != null) {
                return ServingStatusEnum.STARTING;
            }
            return serving.getRevision() == null ? ServingStatusEnum.CREATED : ServingStatusEnum.STOPPED;
        }
        if (serving.getCid().equals(CID_FAILED)) {
            return ServingStatusEnum.FAILED;
        }
        return serving.getLockIP() != null ? ServingStatusEnum.UPDATING : ServingStatusEnum.RUNNING;
    }
}

