/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.kafka;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Timer;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

@Singleton
public class ZookeeperTopicCleanerTimer {
    private static final Logger LOGGER = Logger.getLogger(ZookeeperTopicCleanerTimer.class.getName());
    private static final String offsetTopic = "__consumer_offsets";
    @PersistenceContext(unitName="kthfsPU")
    private EntityManager em;
    @EJB
    private ServiceDiscoveryController serviceDiscoveryController;
    @EJB
    private KafkaBrokers kafkaBrokers;
    @EJB
    private HopsKafkaAdminClient hopsKafkaAdminClient;
    private ZooKeeper zk = null;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Schedule(persistent=false, minute="0", hour="*")
    public void execute(Timer timer) {
        LOGGER.log(Level.INFO, "Running ZookeeperTopicCleanerTimer.");
        try {
            String zkConnectionString = this.kafkaBrokers.getZookeeperConnectionString();
            HashSet<String> zkTopics = new HashSet<String>();
            try {
                this.zk = new ZooKeeper(zkConnectionString, 30000, (Watcher)new ZookeeperWatcher());
                List topics = this.zk.getChildren("/brokers/topics", false);
                zkTopics.addAll(topics);
            }
            catch (IOException ex) {
                LOGGER.log(Level.SEVERE, "Unable to find the zookeeper server: ", ex.toString());
            }
            catch (InterruptedException | KeeperException ex) {
                LOGGER.log(Level.SEVERE, "Cannot retrieve topic list from Zookeeper", ex);
            }
            finally {
                if (this.zk != null) {
                    try {
                        this.zk.close();
                    }
                    catch (InterruptedException ex) {
                        LOGGER.log(Level.SEVERE, "Unable to close zookeeper connection", ex);
                    }
                    this.zk = null;
                }
            }
            List dbProjectTopics = this.em.createNamedQuery("ProjectTopics.findAll").getResultList();
            HashSet<String> dbTopics = new HashSet<String>();
            for (ProjectTopics pt : dbProjectTopics) {
                dbTopics.add(pt.getTopicName());
            }
            zkTopics.removeAll(dbTopics);
            zkTopics.remove(offsetTopic);
            if (!zkTopics.isEmpty()) {
                try {
                    this.hopsKafkaAdminClient.deleteTopics(zkTopics).all().get();
                    LOGGER.log(Level.INFO, "Removed topics {0} from Kafka", new Object[]{zkTopics});
                }
                catch (InterruptedException | ExecutionException ex) {
                    LOGGER.log(Level.SEVERE, "Error dropping topics from Kafka", ex);
                }
            }
        }
        catch (ServiceDiscoveryException ex) {
            LOGGER.log(Level.SEVERE, "Could not discover Zookeeper server addresses", ex);
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, "Got an exception while cleaning up kafka topics", ex);
        }
    }

    @Schedule(persistent=false, minute="*/1", hour="*")
    public void getBrokers() {
        try {
            this.kafkaBrokers.setKafkaBrokers(this.kafkaBrokers.getBrokerEndpoints());
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, null, ex);
        }
    }

    private class ZookeeperWatcher
    implements Watcher {
        private ZookeeperWatcher() {
        }

        public void process(WatchedEvent we) {
        }
    }
}

