/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.kafka;

import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.common.util.PayaraClusterManager;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
import java.io.Serializable;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.EJB;
import javax.ejb.Schedule;
import javax.ejb.ScheduleExpression;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.Timeout;
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerService;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

@Startup
@Singleton
public class ZookeeperTopicCleanerTimer {
    private static final Logger LOGGER = Logger.getLogger(ZookeeperTopicCleanerTimer.class.getName());
    private static final String offsetTopic = "__consumer_offsets";
    @PersistenceContext(unitName="kthfsPU")
    private EntityManager em;
    @EJB
    private KafkaBrokers kafkaBrokers;
    @EJB
    private HopsKafkaAdminClient hopsKafkaAdminClient;
    @EJB
    private PayaraClusterManager payaraClusterManager;
    @Resource
    private TimerService timerService;
    private Timer timer;

    @PostConstruct
    public void init() {
        ScheduleExpression schedule = new ScheduleExpression();
        schedule.hour("*");
        this.timer = this.timerService.createCalendarTimer(schedule, new TimerConfig((Serializable)((Object)"Zookeeper Topic Cleaner"), false));
    }

    @PreDestroy
    public void destroy() {
        if (this.timer != null) {
            this.timer.cancel();
        }
    }

    @Timeout
    public void execute(Timer timer) {
        if (!this.payaraClusterManager.amIThePrimary()) {
            return;
        }
        LOGGER.log(Level.FINE, "Running ZookeeperTopicCleanerTimer.");
        try {
            Set zkTopics = (Set)this.hopsKafkaAdminClient.listTopics().names().get();
            Set dbTopics = this.em.createNamedQuery("ProjectTopics.findAll", ProjectTopics.class).getResultList().stream().map(ProjectTopics::getTopicName).collect(Collectors.toSet());
            zkTopics.removeAll(dbTopics);
            zkTopics.remove(offsetTopic);
            if (!zkTopics.isEmpty()) {
                try {
                    this.hopsKafkaAdminClient.deleteTopics(zkTopics).all().get();
                    LOGGER.log(Level.INFO, "Removed topics {0} from Kafka", new Object[]{zkTopics});
                }
                catch (InterruptedException | ExecutionException ex) {
                    LOGGER.log(Level.SEVERE, "Error dropping topics from Kafka", ex);
                }
            }
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, "Got an exception while cleaning up kafka topics", ex);
        }
    }

    @Schedule(persistent=false, minute="*/1", hour="*")
    public void setBrokers() {
        try {
            this.kafkaBrokers.setBrokerEndpoints();
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, null, ex);
        }
    }
}

