/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.kafka;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import com.logicalclocks.servicediscoverclient.service.Service;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Timer;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import kafka.admin.AdminUtils;
import kafka.common.TopicAlreadyMarkedForDeletionException;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

@Singleton
public class ZookeeperTopicCleanerTimer {
    private static final Logger LOGGER = Logger.getLogger(ZookeeperTopicCleanerTimer.class.getName());
    @PersistenceContext(unitName="kthfsPU")
    private EntityManager em;
    @EJB
    private ServiceDiscoveryController serviceDiscoveryController;
    @EJB
    private KafkaBrokers kafkaBrokers;
    private ZkClient zkClient = null;
    private ZkConnection zkConnection = null;
    private ZooKeeper zk = null;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Schedule(persistent=false, minute="0", hour="*")
    public void execute(Timer timer) {
        LOGGER.log(Level.INFO, "Running ZookeeperTopicCleanerTimer.");
        try {
            String zkConnectionString = this.getZookeeperConnectionString();
            HashSet zkTopics = new HashSet();
            int sessionTimeoutMs = 30000;
            try {
                if (this.zk == null || !this.zk.getState().isConnected()) {
                    if (this.zk != null) {
                        this.zk.close();
                    }
                    this.zk = new ZooKeeper(zkConnectionString, sessionTimeoutMs, (Watcher)new ZookeeperWatcher());
                }
                List topics = this.zk.getChildren("/brokers/topics", false);
                zkTopics.addAll(topics);
            }
            catch (IOException ex) {
                LOGGER.log(Level.SEVERE, "Unable to find the zookeeper server: ", ex.toString());
            }
            catch (InterruptedException | KeeperException ex) {
                LOGGER.log(Level.SEVERE, "Cannot retrieve topic list from Zookeeper", ex);
            }
            List dbProjectTopics = this.em.createNamedQuery("ProjectTopics.findAll").getResultList();
            HashSet<String> dbTopics = new HashSet<String>();
            for (ProjectTopics pt : dbProjectTopics) {
                try {
                    dbTopics.add(pt.getTopicName());
                }
                catch (UnsupportedOperationException e) {
                    LOGGER.log(Level.SEVERE, e.toString());
                }
            }
            try {
                if (this.zkClient == null) {
                    int connectionTimeout = 90000;
                    this.zkClient = new ZkClient(this.getIp(zkConnectionString).getHostName(), sessionTimeoutMs, connectionTimeout, (ZkSerializer)ZKStringSerializer$.MODULE$);
                }
                if (!zkTopics.isEmpty()) {
                    zkTopics.removeAll(dbTopics);
                    for (String topicName : zkTopics) {
                        if (this.zkConnection == null) {
                            this.zkConnection = new ZkConnection(zkConnectionString);
                        }
                        ZkUtils zkUtils = new ZkUtils(this.zkClient, this.zkConnection, false);
                        try {
                            AdminUtils.deleteTopic((ZkUtils)zkUtils, (String)topicName);
                            LOGGER.log(Level.INFO, "{0} is removed from Zookeeper", new Object[]{topicName});
                        }
                        catch (TopicAlreadyMarkedForDeletionException ex) {
                            LOGGER.log(Level.INFO, "{0} is already marked for deletion", new Object[]{topicName});
                        }
                    }
                }
            }
            catch (ServiceException ex) {
                LOGGER.log(Level.SEVERE, "Unable to get zookeeper ip address ", ex);
            }
            finally {
                if (this.zkClient != null) {
                    this.zkClient.close();
                }
                try {
                    if (this.zkConnection != null) {
                        this.zkConnection.close();
                    }
                }
                catch (InterruptedException ex) {
                    LOGGER.log(Level.SEVERE, null, ex);
                }
            }
        }
        catch (Exception e) {
            LOGGER.log(Level.SEVERE, "Got an exception while cleaning up topics", e);
        }
    }

    private InetAddress getIp(String zkIp) throws ServiceException {
        String ip = zkIp.split(":")[0];
        try {
            return InetAddress.getByName(ip);
        }
        catch (UnknownHostException ex) {
            throw new ServiceException(RESTCodes.ServiceErrorCode.ZOOKEEPER_SERVICE_UNAVAILABLE, Level.SEVERE, ex.getMessage());
        }
    }

    @Schedule(persistent=false, minute="*/1", hour="*")
    public void getBrokers() {
        try {
            this.kafkaBrokers.setKafkaBrokers(this.kafkaBrokers.getBrokerEndpoints());
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, null, ex);
        }
    }

    private String getZookeeperConnectionString() throws ServiceDiscoveryException {
        Service zk = this.serviceDiscoveryController.getAnyAddressOfServiceWithDNS(ServiceDiscoveryController.HopsworksService.ZOOKEEPER_CLIENT);
        return zk.getAddress() + ":" + zk.getPort();
    }

    private class ZookeeperWatcher
    implements Watcher {
        private ZookeeperWatcher() {
        }

        public void process(WatchedEvent we) {
        }
    }
}

