package io.hops.hopsworks.common.kafka;

import io.hops.hopsworks.common.dao.kafka.KafkaFacade;
import io.hops.hopsworks.common.dao.kafka.ProjectTopics;
import io.hops.hopsworks.common.exception.ServiceException;
import io.hops.hopsworks.common.util.Settings;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Timer;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import kafka.admin.AdminUtils;
import kafka.common.TopicAlreadyMarkedForDeletionException;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

@Singleton
/* loaded from: input_file:io/hops/hopsworks/common/kafka/ZookeeprTopicCleanerTimer.class */
public class ZookeeprTopicCleanerTimer {
    private static final Logger LOGGER = Logger.getLogger(ZookeeprTopicCleanerTimer.class.getName());

    @PersistenceContext(unitName = "kthfsPU")
    private EntityManager em;

    @EJB
    Settings settings;

    @EJB
    KafkaFacade kafkaFacade;
    private ZkClient zkClient = null;
    private ZkConnection zkConnection = null;
    private ZooKeeper zk = null;

    /* loaded from: input_file:io/hops/hopsworks/common/kafka/ZookeeprTopicCleanerTimer$ZookeeperWatcher.class */
    private class ZookeeperWatcher implements Watcher {
        private ZookeeperWatcher() {
        }

        public void process(WatchedEvent watchedEvent) {
        }
    }

    @Schedule(persistent = false, minute = "0", hour = Settings.KAFKA_ACL_WILDCARD)
    public void execute(Timer timer) {
        HashSet<String> hashSet = new HashSet();
        try {
            if (this.zk == null || !this.zk.getState().isConnected()) {
                if (this.zk != null) {
                    this.zk.close();
                }
                this.zk = new ZooKeeper(this.settings.getZkConnectStr(), 30000, new ZookeeperWatcher());
            }
            hashSet.addAll(this.zk.getChildren("/brokers/topics", false));
        } catch (KeeperException | InterruptedException e) {
            LOGGER.log(Level.SEVERE, "Cannot retrieve topic list from Zookeeper", e);
        } catch (IOException e2) {
            LOGGER.log(Level.SEVERE, "Unable to find the zookeeper server: ", e2.toString());
        }
        List resultList = this.em.createNamedQuery("ProjectTopics.findAll").getResultList();
        HashSet hashSet2 = new HashSet();
        Iterator it = resultList.iterator();
        while (it.hasNext()) {
            try {
                try {
                    hashSet2.add(((ProjectTopics) it.next()).getTopicName());
                } catch (UnsupportedOperationException e3) {
                    LOGGER.log(Level.SEVERE, e3.toString());
                }
            } catch (Throwable th) {
                if (this.zkClient != null) {
                    this.zkClient.close();
                }
                try {
                    if (this.zkConnection != null) {
                        this.zkConnection.close();
                    }
                } catch (InterruptedException e4) {
                    LOGGER.log(Level.SEVERE, (String) null, (Throwable) e4);
                }
                throw th;
            }
        }
        try {
            if (this.zkClient == null) {
                this.zkClient = new ZkClient(this.kafkaFacade.getIp(this.settings.getZkConnectStr()).getHostName(), 30000, 90000, ZKStringSerializer$.MODULE$);
            }
            if (!hashSet.isEmpty()) {
                hashSet.removeAll(hashSet2);
                for (String str : hashSet) {
                    if (this.zkConnection == null) {
                        this.zkConnection = new ZkConnection(this.settings.getZkConnectStr());
                    }
                    try {
                        AdminUtils.deleteTopic(new ZkUtils(this.zkClient, this.zkConnection, false), str);
                        LOGGER.log(Level.INFO, "{0} is removed from Zookeeper", new Object[]{str});
                    } catch (TopicAlreadyMarkedForDeletionException e5) {
                        LOGGER.log(Level.INFO, "{0} is already marked for deletion", new Object[]{str});
                    }
                }
            }
            if (this.zkClient != null) {
                this.zkClient.close();
            }
            try {
                if (this.zkConnection != null) {
                    this.zkConnection.close();
                }
            } catch (InterruptedException e6) {
                LOGGER.log(Level.SEVERE, (String) null, (Throwable) e6);
            }
        } catch (ServiceException e7) {
            LOGGER.log(Level.SEVERE, "Unable to get zookeeper ip address ", (Throwable) e7);
            if (this.zkClient != null) {
                this.zkClient.close();
            }
            try {
                if (this.zkConnection != null) {
                    this.zkConnection.close();
                }
            } catch (InterruptedException e8) {
                LOGGER.log(Level.SEVERE, (String) null, (Throwable) e8);
            }
        }
    }

    @Schedule(persistent = false, minute = "*/1", hour = Settings.KAFKA_ACL_WILDCARD)
    public void getBrokers() {
        try {
            this.settings.setKafkaBrokers(this.settings.getBrokerEndpoints());
        } catch (IOException | KeeperException | InterruptedException e) {
            LOGGER.log(Level.SEVERE, (String) null, e);
        }
    }
}
