/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;

@InterfaceAudience.Private
public class ReplicationSourceManager
implements ReplicationListener {
    private static final Log LOG = LogFactory.getLog(ReplicationSourceManager.class);
    private final List<ReplicationSourceInterface> sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
    private final List<ReplicationSourceInterface> oldsources;
    private final ReplicationQueues replicationQueues;
    private final ReplicationTracker replicationTracker;
    private final ReplicationPeers replicationPeers;
    private final UUID clusterId;
    private final Server server;
    private final Map<String, Map<String, SortedSet<String>>> walsById;
    private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
    private final Configuration conf;
    private final FileSystem fs;
    private Set<Path> latestPaths;
    private final Path logDir;
    private final Path oldLogDir;
    private final long sleepBeforeFailover;
    private final ThreadPoolExecutor executor;
    private final Random rand;

    public ReplicationSourceManager(ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId) {
        this.replicationQueues = replicationQueues;
        this.replicationPeers = replicationPeers;
        this.replicationTracker = replicationTracker;
        this.server = server;
        this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
        this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
        this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
        this.conf = conf;
        this.fs = fs;
        this.logDir = logDir;
        this.oldLogDir = oldLogDir;
        this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000L);
        this.clusterId = clusterId;
        this.replicationTracker.registerListener((ReplicationListener)this);
        this.replicationPeers.getAllPeerIds();
        int nbWorkers = conf.getInt("replication.executor.workers", 1);
        this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
        tfb.setNameFormat("ReplicationExecutor-%d");
        tfb.setDaemon(true);
        this.executor.setThreadFactory(tfb.build());
        this.rand = new Random();
        this.latestPaths = Collections.synchronizedSet(new HashSet());
    }

    public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, boolean holdLogInZK) {
        String fileName = log.getName();
        this.replicationQueues.setLogPosition(id, fileName, position);
        if (holdLogInZK) {
            return;
        }
        this.cleanOldLogs(fileName, id, queueRecovered);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanOldLogs(String key, String id, boolean queueRecovered) {
        String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
        if (queueRecovered) {
            SortedSet<String> wals = this.walsByIdRecoveredQueues.get(id).get(logPrefix);
            if (wals != null && !wals.first().equals(key)) {
                this.cleanOldLogs(wals, key, id);
            }
        } else {
            Map<String, Map<String, SortedSet<String>>> map = this.walsById;
            synchronized (map) {
                SortedSet<String> wals = this.walsById.get(id).get(logPrefix);
                if (wals != null && !wals.first().equals(key)) {
                    this.cleanOldLogs(wals, key, id);
                }
            }
        }
    }

    private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
        SortedSet<String> walSet = wals.headSet(key);
        LOG.debug((Object)("Removing " + walSet.size() + " logs in the list: " + walSet));
        for (String wal : walSet) {
            this.replicationQueues.removeLog(id, wal);
        }
        walSet.clear();
    }

    protected void init() throws IOException, ReplicationException {
        for (String id : this.replicationPeers.getPeerIds()) {
            this.addSource(id);
        }
        List currentReplicators = this.replicationQueues.getListOfReplicators();
        if (currentReplicators == null || currentReplicators.size() == 0) {
            return;
        }
        List otherRegionServers = this.replicationTracker.getListOfRegionServers();
        LOG.info((Object)("Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers));
        for (String rs : currentReplicators) {
            if (otherRegionServers.contains(rs)) continue;
            this.transferQueues(rs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
        ReplicationPeerConfig peerConfig = this.replicationPeers.getReplicationPeerConfig(id);
        ReplicationPeer peer = this.replicationPeers.getPeer(id);
        ReplicationSourceInterface src = this.getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, this.server, id, this.clusterId, peerConfig, peer);
        Map<String, Map<String, SortedSet<String>>> map = this.walsById;
        synchronized (map) {
            this.sources.add(src);
            HashMap walsByGroup = new HashMap();
            this.walsById.put(id, walsByGroup);
            Set<Path> set = this.latestPaths;
            synchronized (set) {
                if (this.latestPaths.size() > 0) {
                    for (Path logPath : this.latestPaths) {
                        String name = logPath.getName();
                        String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
                        TreeSet<String> logs = new TreeSet<String>();
                        logs.add(name);
                        walsByGroup.put(walPrefix, logs);
                        try {
                            this.replicationQueues.addLog(id, name);
                        }
                        catch (ReplicationException e) {
                            String message = "Cannot add log to queue when creating a new source, queueId=" + id + ", filename=" + name;
                            this.server.stop(message);
                            throw e;
                        }
                        src.enqueueLog(logPath);
                    }
                }
            }
        }
        src.startup();
        return src;
    }

    public void deleteSource(String peerId, boolean closeConnection) {
        this.replicationQueues.removeQueue(peerId);
        if (closeConnection) {
            this.replicationPeers.peerRemoved(peerId);
        }
    }

    public void join() {
        this.executor.shutdown();
        if (this.sources.size() == 0) {
            this.replicationQueues.removeAllQueues();
        }
        for (ReplicationSourceInterface source : this.sources) {
            source.terminate("Region server is closing");
        }
    }

    protected Map<String, Map<String, SortedSet<String>>> getWALs() {
        return Collections.unmodifiableMap(this.walsById);
    }

    protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
        return Collections.unmodifiableMap(this.walsByIdRecoveredQueues);
    }

    public List<ReplicationSourceInterface> getSources() {
        return this.sources;
    }

    public List<ReplicationSourceInterface> getOldSources() {
        return this.oldsources;
    }

    @VisibleForTesting
    List<String> getAllQueues() {
        return this.replicationQueues.getAllQueues();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void preLogRoll(Path newLog) throws IOException {
        this.recordLog(newLog);
        String logName = newLog.getName();
        String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
        Set<Path> set = this.latestPaths;
        synchronized (set) {
            Iterator<Path> iterator2 = this.latestPaths.iterator();
            while (iterator2.hasNext()) {
                Path path = iterator2.next();
                if (!path.getName().contains(logPrefix)) continue;
                iterator2.remove();
                break;
            }
            this.latestPaths.add(newLog);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recordLog(Path logPath) throws IOException {
        String logName = logPath.getName();
        String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
        Object object = this.replicationPeers;
        synchronized (object) {
            for (String string : this.replicationPeers.getPeerIds()) {
                try {
                    this.replicationQueues.addLog(string, logName);
                }
                catch (ReplicationException e) {
                    throw new IOException("Cannot add log to replication queue when creating a new source, queueId=" + string + ", filename=" + logName, (Throwable)((Object)e));
                }
            }
        }
        object = this.walsById;
        synchronized (object) {
            for (Map.Entry entry : this.walsById.entrySet()) {
                String peerId = (String)entry.getKey();
                Map walsByPrefix = (Map)entry.getValue();
                boolean existingPrefix = false;
                for (Map.Entry walsEntry : walsByPrefix.entrySet()) {
                    SortedSet wals = (SortedSet)walsEntry.getValue();
                    if (this.sources.isEmpty()) {
                        wals.clear();
                    }
                    if (!logPrefix.equals(walsEntry.getKey())) continue;
                    wals.add(logName);
                    existingPrefix = true;
                }
                if (existingPrefix) continue;
                LOG.debug((Object)("Start tracking logs for wal group " + logPrefix + " for peer " + peerId));
                TreeSet<String> wals = new TreeSet<String>();
                wals.add(logName);
                walsByPrefix.put(logPrefix, wals);
            }
        }
    }

    void postLogRoll(Path newLog) throws IOException {
        for (ReplicationSourceInterface source : this.sources) {
            source.enqueueLog(newLog);
        }
    }

    protected ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId, ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException {
        ReplicationSourceInterface src;
        RegionServerCoprocessorHost rsServerHost = null;
        TableDescriptors tableDescriptors = null;
        if (server instanceof HRegionServer) {
            rsServerHost = ((HRegionServer)server).getRegionServerCoprocessorHost();
            tableDescriptors = ((HRegionServer)server).getTableDescriptors();
        }
        try {
            Class<?> c = Class.forName(conf.get("replication.replicationsource.implementation", ReplicationSource.class.getCanonicalName()));
            src = (ReplicationSourceInterface)c.newInstance();
        }
        catch (Exception e) {
            LOG.warn((Object)"Passed replication source implementation throws errors, defaulting to ReplicationSource", (Throwable)e);
            src = new ReplicationSource();
        }
        ReplicationEndpoint replicationEndpoint = null;
        try {
            ReplicationEndpoint newReplicationEndPoint;
            String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
            if (replicationEndpointImpl == null) {
                replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
            }
            Class<?> c = Class.forName(replicationEndpointImpl);
            replicationEndpoint = (ReplicationEndpoint)c.newInstance();
            if (rsServerHost != null && (newReplicationEndPoint = rsServerHost.postCreateReplicationEndPoint(replicationEndpoint)) != null) {
                replicationEndpoint = newReplicationEndPoint;
            }
        }
        catch (Exception e) {
            LOG.warn((Object)("Passed replication endpoint implementation throws errors while initializing ReplicationSource for peer: " + peerId), (Throwable)e);
            throw new IOException(e);
        }
        MetricsSource metrics = new MetricsSource(peerId);
        src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId, replicationEndpoint, metrics);
        replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
        return src;
    }

    private void transferQueues(String rsZnode) {
        NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers, this.clusterId);
        try {
            this.executor.execute(transfer);
        }
        catch (RejectedExecutionException ex) {
            LOG.info((Object)("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage()));
        }
    }

    public void closeRecoveredQueue(ReplicationSourceInterface src) {
        LOG.info((Object)("Done with the recovered queue " + src.getPeerClusterZnode()));
        this.oldsources.remove(src);
        this.deleteSource(src.getPeerClusterZnode(), false);
        this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removePeer(String id) {
        LOG.info((Object)("Closing the following queue " + id + ", currently have " + this.sources.size() + " and another " + this.oldsources.size() + " that were recovered"));
        String terminateMessage = "Replication stream was removed by a user";
        ArrayList<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<ReplicationSourceInterface>();
        List<ReplicationSourceInterface> list = this.oldsources;
        synchronized (list) {
            for (ReplicationSourceInterface src : this.oldsources) {
                if (!id.equals(src.getPeerClusterId())) continue;
                oldSourcesToDelete.add(src);
            }
            for (ReplicationSourceInterface src : oldSourcesToDelete) {
                src.terminate(terminateMessage);
                this.closeRecoveredQueue(src);
            }
        }
        LOG.info((Object)("Number of deleted recovered sources for " + id + ": " + oldSourcesToDelete.size()));
        ArrayList<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
        ReplicationPeers replicationPeers = this.replicationPeers;
        synchronized (replicationPeers) {
            for (ReplicationSourceInterface src : this.sources) {
                if (!id.equals(src.getPeerClusterId())) continue;
                srcToRemove.add(src);
            }
            if (srcToRemove.size() == 0) {
                LOG.error((Object)("The queue we wanted to close is missing " + id));
                return;
            }
            for (ReplicationSourceInterface toRemove : srcToRemove) {
                toRemove.terminate(terminateMessage);
                this.sources.remove(toRemove);
            }
            this.deleteSource(id, true);
        }
    }

    public void regionServerRemoved(String regionserver) {
        this.transferQueues(regionserver);
    }

    public void peerRemoved(String peerId) {
        this.removePeer(peerId);
    }

    public void peerListChanged(List<String> peerIds) {
        for (String id : peerIds) {
            try {
                boolean added = this.replicationPeers.peerAdded(id);
                if (!added) continue;
                this.addSource(id);
            }
            catch (Exception e) {
                LOG.error((Object)"Error while adding a new peer", (Throwable)e);
            }
        }
    }

    public Path getOldLogDir() {
        return this.oldLogDir;
    }

    public Path getLogDir() {
        return this.logDir;
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public String getStats() {
        StringBuffer stats = new StringBuffer();
        for (ReplicationSourceInterface source : this.sources) {
            stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
            stats.append(source.getStats() + "\n");
        }
        for (ReplicationSourceInterface oldSource : this.oldsources) {
            stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
            stats.append(oldSource.getStats() + "\n");
        }
        return stats.toString();
    }

    class NodeFailoverWorker
    extends Thread {
        private String rsZnode;
        private final ReplicationQueues rq;
        private final ReplicationPeers rp;
        private final UUID clusterId;

        public NodeFailoverWorker(String rsZnode) {
            this(rsZnode, replicationSourceManager.replicationQueues, replicationSourceManager.replicationPeers, replicationSourceManager.clusterId);
        }

        public NodeFailoverWorker(String rsZnode, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, UUID clusterId) {
            super("Failover-for-" + rsZnode);
            this.rsZnode = rsZnode;
            this.rq = replicationQueues;
            this.rp = replicationPeers;
            this.clusterId = clusterId;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (this.rq.isThisOurZnode(this.rsZnode)) {
                return;
            }
            try {
                Thread.sleep(ReplicationSourceManager.this.sleepBeforeFailover + (long)(ReplicationSourceManager.this.rand.nextFloat() * (float)ReplicationSourceManager.this.sleepBeforeFailover));
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Interrupted while waiting before transferring a queue.");
                Thread.currentThread().interrupt();
            }
            if (ReplicationSourceManager.this.server.isStopped()) {
                LOG.info((Object)"Not transferring queue since we are shutting down");
                return;
            }
            SortedMap newQueues = null;
            newQueues = this.rq.claimQueues(this.rsZnode);
            if (newQueues.isEmpty()) {
                return;
            }
            for (Map.Entry entry : newQueues.entrySet()) {
                String peerId = (String)entry.getKey();
                SortedSet walsSet = (SortedSet)entry.getValue();
                try {
                    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
                    String actualPeerId = replicationQueueInfo.getPeerId();
                    ReplicationPeer peer = ReplicationSourceManager.this.replicationPeers.getPeer(actualPeerId);
                    ReplicationPeerConfig peerConfig = null;
                    try {
                        peerConfig = ReplicationSourceManager.this.replicationPeers.getReplicationPeerConfig(actualPeerId);
                    }
                    catch (ReplicationException ex) {
                        LOG.warn((Object)("Received exception while getting replication peer config, skipping replay" + (Object)((Object)ex)));
                    }
                    if (peer == null || peerConfig == null) {
                        LOG.warn((Object)("Skipping failover for peer:" + actualPeerId + " of node" + this.rsZnode));
                        ReplicationSourceManager.this.replicationQueues.removeQueue(peerId);
                        continue;
                    }
                    HashMap<String, TreeSet<String>> walsByGroup = new HashMap<String, TreeSet<String>>();
                    ReplicationSourceManager.this.walsByIdRecoveredQueues.put(peerId, walsByGroup);
                    for (String wal : walsSet) {
                        String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(wal);
                        TreeSet<String> wals = (TreeSet<String>)walsByGroup.get(walPrefix);
                        if (wals == null) {
                            wals = new TreeSet<String>();
                            walsByGroup.put(walPrefix, wals);
                        }
                        wals.add(wal);
                    }
                    ReplicationSourceInterface src = ReplicationSourceManager.this.getReplicationSource(ReplicationSourceManager.this.conf, ReplicationSourceManager.this.fs, ReplicationSourceManager.this, this.rq, this.rp, ReplicationSourceManager.this.server, peerId, this.clusterId, peerConfig, peer);
                    List list = ReplicationSourceManager.this.oldsources;
                    synchronized (list) {
                        if (!this.rp.getPeerIds().contains(src.getPeerClusterId())) {
                            src.terminate("Recovered queue doesn't belong to any current peer");
                            ReplicationSourceManager.this.closeRecoveredQueue(src);
                            continue;
                        }
                        ReplicationSourceManager.this.oldsources.add(src);
                        for (String wal : walsSet) {
                            src.enqueueLog(new Path(ReplicationSourceManager.this.oldLogDir, wal));
                        }
                        src.startup();
                    }
                }
                catch (IOException e) {
                    LOG.error((Object)"Failed creating a source", (Throwable)e);
                }
            }
        }
    }
}

