package io.hops.hopsworks.common.kafka;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Timer;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

@Singleton
/* loaded from: input_file:io/hops/hopsworks/common/kafka/ZookeeperTopicCleanerTimer.class */
public class ZookeeperTopicCleanerTimer {
    private static final Logger LOGGER = Logger.getLogger(ZookeeperTopicCleanerTimer.class.getName());
    private static final String offsetTopic = "__consumer_offsets";

    @PersistenceContext(unitName = "kthfsPU")
    private EntityManager em;

    @EJB
    private ServiceDiscoveryController serviceDiscoveryController;

    @EJB
    private KafkaBrokers kafkaBrokers;

    @EJB
    private HopsKafkaAdminClient hopsKafkaAdminClient;
    private ZooKeeper zk = null;

    /* loaded from: input_file:io/hops/hopsworks/common/kafka/ZookeeperTopicCleanerTimer$ZookeeperWatcher.class */
    private class ZookeeperWatcher implements Watcher {
        private ZookeeperWatcher() {
        }

        public void process(WatchedEvent watchedEvent) {
        }
    }

    @Schedule(persistent = false, minute = "0", hour = Settings.KAFKA_ACL_WILDCARD)
    public void execute(Timer timer) {
        LOGGER.log(Level.INFO, "Running ZookeeperTopicCleanerTimer.");
        try {
            String zookeeperConnectionString = this.kafkaBrokers.getZookeeperConnectionString();
            HashSet hashSet = new HashSet();
            try {
                try {
                    this.zk = new ZooKeeper(zookeeperConnectionString, Settings.ZOOKEEPER_SESSION_TIMEOUT_MS, new ZookeeperWatcher());
                    hashSet.addAll(this.zk.getChildren("/brokers/topics", false));
                    if (this.zk != null) {
                        try {
                            this.zk.close();
                        } catch (InterruptedException e) {
                            LOGGER.log(Level.SEVERE, "Unable to close zookeeper connection", (Throwable) e);
                        }
                        this.zk = null;
                    }
                } catch (Throwable th) {
                    if (this.zk != null) {
                        try {
                            this.zk.close();
                        } catch (InterruptedException e2) {
                            LOGGER.log(Level.SEVERE, "Unable to close zookeeper connection", (Throwable) e2);
                        }
                        this.zk = null;
                    }
                    throw th;
                }
            } catch (KeeperException | InterruptedException e3) {
                LOGGER.log(Level.SEVERE, "Cannot retrieve topic list from Zookeeper", e3);
                if (this.zk != null) {
                    try {
                        this.zk.close();
                    } catch (InterruptedException e4) {
                        LOGGER.log(Level.SEVERE, "Unable to close zookeeper connection", (Throwable) e4);
                    }
                    this.zk = null;
                }
            } catch (IOException e5) {
                LOGGER.log(Level.SEVERE, "Unable to find the zookeeper server: ", e5.toString());
                if (this.zk != null) {
                    try {
                        this.zk.close();
                    } catch (InterruptedException e6) {
                        LOGGER.log(Level.SEVERE, "Unable to close zookeeper connection", (Throwable) e6);
                    }
                    this.zk = null;
                }
            }
            List resultList = this.em.createNamedQuery("ProjectTopics.findAll").getResultList();
            HashSet hashSet2 = new HashSet();
            Iterator it = resultList.iterator();
            while (it.hasNext()) {
                hashSet2.add(((ProjectTopics) it.next()).getTopicName());
            }
            hashSet.removeAll(hashSet2);
            hashSet.remove(offsetTopic);
            if (!hashSet.isEmpty()) {
                try {
                    this.hopsKafkaAdminClient.deleteTopics(hashSet).all().get();
                    LOGGER.log(Level.INFO, "Removed topics {0} from Kafka", new Object[]{hashSet});
                } catch (InterruptedException | ExecutionException e7) {
                    LOGGER.log(Level.SEVERE, "Error dropping topics from Kafka", e7);
                }
            }
        } catch (Exception e8) {
            LOGGER.log(Level.SEVERE, "Got an exception while cleaning up kafka topics", (Throwable) e8);
        } catch (ServiceDiscoveryException e9) {
            LOGGER.log(Level.SEVERE, "Could not discover Zookeeper server addresses", e9);
        }
    }

    @Schedule(persistent = false, minute = "*/1", hour = Settings.KAFKA_ACL_WILDCARD)
    public void getBrokers() {
        try {
            this.kafkaBrokers.setKafkaBrokers(this.kafkaBrokers.getBrokerEndpoints());
        } catch (Exception e) {
            LOGGER.log(Level.SEVERE, (String) null, (Throwable) e);
        }
    }
}
