/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.kafka;

import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Timer;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

@Singleton
public class ZookeeperTopicCleanerTimer {
    private static final Logger LOGGER = Logger.getLogger(ZookeeperTopicCleanerTimer.class.getName());
    private static final String offsetTopic = "__consumer_offsets";
    @PersistenceContext(unitName="kthfsPU")
    private EntityManager em;
    @EJB
    private KafkaBrokers kafkaBrokers;
    @EJB
    private HopsKafkaAdminClient hopsKafkaAdminClient;

    @Schedule(minute="0", hour="*", info="Zookeeper Topic Cleaner")
    public void execute(Timer timer) {
        LOGGER.log(Level.FINE, "Running ZookeeperTopicCleanerTimer.");
        try {
            Set zkTopics = (Set)this.hopsKafkaAdminClient.listTopics().names().get();
            Set dbTopics = this.em.createNamedQuery("ProjectTopics.findAll", ProjectTopics.class).getResultList().stream().map(ProjectTopics::getTopicName).collect(Collectors.toSet());
            zkTopics.removeAll(dbTopics);
            zkTopics.remove(offsetTopic);
            if (!zkTopics.isEmpty()) {
                try {
                    this.hopsKafkaAdminClient.deleteTopics(zkTopics).all().get();
                    LOGGER.log(Level.INFO, "Removed topics {0} from Kafka", new Object[]{zkTopics});
                }
                catch (InterruptedException | ExecutionException ex) {
                    LOGGER.log(Level.SEVERE, "Error dropping topics from Kafka", ex);
                }
            }
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, "Got an exception while cleaning up kafka topics", ex);
        }
    }

    @Schedule(persistent=false, minute="*/1", hour="*")
    public void getBrokers() {
        try {
            this.kafkaBrokers.setInternalKafkaBrokers(this.kafkaBrokers.getBrokerEndpoints("INTERNAL"));
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, null, ex);
        }
    }
}

