/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.serving;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.dao.serving.ServingFacade;
import io.hops.hopsworks.common.integrations.LocalhostStereotype;
import io.hops.hopsworks.common.serving.ServingController;
import io.hops.hopsworks.common.serving.ServingStatusEnum;
import io.hops.hopsworks.common.serving.ServingWrapper;
import io.hops.hopsworks.common.serving.sklearn.LocalhostSkLearnServingController;
import io.hops.hopsworks.common.serving.tf.LocalhostTfServingController;
import io.hops.hopsworks.common.serving.util.KafkaServingHelper;
import io.hops.hopsworks.common.serving.util.ServingCommands;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.exceptions.ServingException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.serving.ModelServer;
import io.hops.hopsworks.persistence.entity.serving.Serving;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@LocalhostStereotype
@Stateless
@TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
public class LocalhostServingController
implements ServingController {
    public static final String CID_STOPPED = "stopped";
    public static final String SERVING_DIRS = "/serving/";
    @EJB
    private ServingFacade servingFacade;
    @EJB
    private KafkaServingHelper kafkaServingHelper;
    @EJB
    private LocalhostSkLearnServingController skLearnServingController;
    @EJB
    private LocalhostTfServingController tfServingController;

    @Override
    public List<ServingWrapper> getServings(Project project, String modelNameFilter, ServingStatusEnum statusFilter) {
        List<Serving> servingList = Strings.isNullOrEmpty((String)modelNameFilter) ? this.servingFacade.findForProject(project) : this.servingFacade.findForProjectAndModel(project, modelNameFilter);
        ArrayList<ServingWrapper> servingWrapperList = new ArrayList<ServingWrapper>();
        for (Serving serving : servingList) {
            ServingWrapper servingWrapper = this.getServingInternal(serving);
            if (statusFilter != null && !servingWrapper.getStatus().name().equals(statusFilter.name())) continue;
            servingWrapperList.add(servingWrapper);
        }
        return servingWrapperList;
    }

    @Override
    public ServingWrapper getServing(Project project, Integer id) {
        Serving serving = this.servingFacade.findByProjectAndId(project, id);
        if (serving == null) {
            return null;
        }
        return this.getServingInternal(serving);
    }

    @Override
    public void deleteServings(Project project) throws ServingException {
        List<Serving> servingList = this.servingFacade.findForProject(project);
        for (Serving serving : servingList) {
            this.servingFacade.acquireLock(project, serving.getId());
            ServingStatusEnum status = this.getServingStatus(serving);
            if (!status.equals((Object)ServingStatusEnum.STARTING)) {
                this.killServingInstance(project, serving, false);
            }
            this.servingFacade.delete(serving);
        }
    }

    @Override
    public void deleteServing(Project project, Integer id) throws ServingException {
        Serving serving = this.servingFacade.acquireLock(project, id);
        ServingStatusEnum status = this.getServingStatus(serving);
        if (!status.equals((Object)ServingStatusEnum.STARTING)) {
            this.killServingInstance(project, serving, false);
        }
        this.servingFacade.delete(serving);
    }

    @Override
    public String getClassName() {
        return LocalhostServingController.class.getName();
    }

    private ServingWrapper getServingInternal(Serving serving) {
        ServingWrapper servingWrapper = new ServingWrapper(serving);
        ServingStatusEnum status = this.getServingStatus(serving);
        servingWrapper.setStatus(status);
        switch (status) {
            case STOPPED: 
            case STARTING: 
            case UPDATING: {
                servingWrapper.setAvailableReplicas(0);
                break;
            }
            case RUNNING: {
                servingWrapper.setAvailableReplicas(1);
                servingWrapper.setNodePort(serving.getLocalPort());
            }
        }
        servingWrapper.setKafkaTopicDTO(this.kafkaServingHelper.buildTopicDTO(serving));
        return servingWrapper;
    }

    @Override
    public void startOrStop(Project project, Users user, Integer servingId, ServingCommands command) throws ServingException {
        Serving serving = this.servingFacade.acquireLock(project, servingId);
        ServingStatusEnum currentStatus = this.getServingStatus(serving);
        if (currentStatus == ServingStatusEnum.STARTING && command == ServingCommands.START) {
            this.startServingInstance(project, user, serving);
        } else if (currentStatus == ServingStatusEnum.UPDATING && command == ServingCommands.STOP) {
            this.killServingInstance(project, serving, true);
        } else {
            this.servingFacade.releaseLock(project, servingId);
            String userMsg = "Instance is already " + (command == ServingCommands.START ? ServingStatusEnum.STARTED.toString() : ServingStatusEnum.STOPPED.toString()).toLowerCase();
            throw new ServingException(RESTCodes.ServingErrorCode.LIFECYCLEERROR, Level.FINE, userMsg);
        }
    }

    @Override
    public void createOrUpdate(Project project, Users user, ServingWrapper newServing) throws ProjectException, ServingException, KafkaException, UserException, InterruptedException, ExecutionException, ServiceException {
        Serving serving = newServing.getServing();
        if (serving.getId() == null) {
            serving.setCreated(new Date());
            serving.setCreator(user);
            serving.setProject(project);
            UUID uuid = UUID.randomUUID();
            serving.setLocalDir(uuid.toString());
            serving.setCid(CID_STOPPED);
            serving.setInstances(Integer.valueOf(1));
            this.kafkaServingHelper.setupKafkaServingTopic(project, newServing, serving, null);
            this.servingFacade.merge(serving);
        } else {
            Serving oldDbServing = this.servingFacade.acquireLock(project, serving.getId());
            ServingStatusEnum status = this.getServingStatus(oldDbServing);
            this.kafkaServingHelper.setupKafkaServingTopic(project, newServing, serving, oldDbServing);
            Serving dbServing = this.servingFacade.updateDbObject(serving, project);
            if (status == ServingStatusEnum.RUNNING || status == ServingStatusEnum.UPDATING) {
                if (!oldDbServing.getName().equals(dbServing.getName()) || !oldDbServing.getModelPath().equals(dbServing.getModelPath()) || oldDbServing.isBatchingEnabled() != dbServing.isBatchingEnabled() || oldDbServing.getModelVersion() > dbServing.getModelVersion()) {
                    this.restartServingInstance(project, user, oldDbServing, dbServing);
                } else if (serving.getModelServer() == ModelServer.TENSORFLOW_SERVING) {
                    this.tfServingController.updateModelVersion(project, user, dbServing);
                } else {
                    this.servingFacade.releaseLock(project, serving.getId());
                }
            } else {
                this.servingFacade.releaseLock(project, serving.getId());
            }
        }
    }

    @Override
    public int getMaxNumInstances() {
        return 1;
    }

    private void startServingInstance(Project project, Users user, Serving serving) throws ServingException {
        if (serving.getModelServer() == ModelServer.TENSORFLOW_SERVING) {
            this.tfServingController.startServingInstance(project, user, serving);
        }
        if (serving.getModelServer() == ModelServer.FLASK) {
            this.skLearnServingController.startServingInstance(project, user, serving);
        }
    }

    private void killServingInstance(Project project, Serving serving, boolean releaseLock) throws ServingException {
        if (serving.getModelServer() == ModelServer.TENSORFLOW_SERVING) {
            this.tfServingController.killServingInstance(project, serving, releaseLock);
        }
        if (serving.getModelServer() == ModelServer.FLASK) {
            this.skLearnServingController.killServingInstance(project, serving, releaseLock);
        }
    }

    private void restartServingInstance(Project project, Users user, Serving currentInstance, Serving newInstance) throws ServingException {
        this.killServingInstance(project, currentInstance, false);
        this.startServingInstance(project, user, newInstance);
    }

    private ServingStatusEnum getServingStatus(Serving serving) {
        if (serving.getCid().equals(CID_STOPPED) && serving.getLockIP() == null) {
            return ServingStatusEnum.STOPPED;
        }
        if (serving.getCid().equals(CID_STOPPED)) {
            return ServingStatusEnum.STARTING;
        }
        if (!serving.getCid().equals(CID_STOPPED) && serving.getLockIP() == null) {
            return ServingStatusEnum.RUNNING;
        }
        return ServingStatusEnum.UPDATING;
    }
}

