/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver;

import io.hops.hudi.org.apache.hadoop.hbase.Abortable;
import io.hops.hudi.org.apache.hadoop.hbase.CellScanner;
import io.hops.hudi.org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import io.hops.hudi.org.apache.hadoop.hbase.Server;
import io.hops.hudi.org.apache.hadoop.hbase.Stoppable;
import io.hops.hudi.org.apache.hadoop.hbase.TableName;
import io.hops.hudi.org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import io.hops.hudi.org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import io.hops.hudi.org.apache.hadoop.hbase.replication.ReplicationFactory;
import io.hops.hudi.org.apache.hadoop.hbase.replication.ReplicationPeers;
import io.hops.hudi.org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import io.hops.hudi.org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import io.hops.hudi.org.apache.hadoop.hbase.replication.ReplicationTracker;
import io.hops.hudi.org.apache.hadoop.hbase.replication.ReplicationUtils;
import io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
import io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory;
import io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver.MetricsSink;
import io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
import io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandlerImpl;
import io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
import io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALActionListener;
import io.hops.hudi.org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import io.hops.hudi.org.apache.hadoop.hbase.util.Pair;
import io.hops.hudi.org.apache.hadoop.hbase.wal.WALFactory;
import io.hops.hudi.org.apache.hadoop.hbase.wal.WALProvider;
import io.hops.hudi.org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import io.hops.hudi.org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import io.hops.hudi.org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class Replication
implements ReplicationSourceService,
ReplicationSinkService {
    private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
    private boolean isReplicationForBulkLoadDataEnabled;
    private ReplicationSourceManager replicationManager;
    private ReplicationQueueStorage queueStorage;
    private ReplicationPeers replicationPeers;
    private ReplicationTracker replicationTracker;
    private Configuration conf;
    private ReplicationSink replicationSink;
    private Server server;
    private ScheduledExecutorService scheduleThreadPool;
    private int statsThreadPeriod;
    private ReplicationLoad replicationLoad;
    private MetricsReplicationGlobalSourceSource globalMetricsSource;
    private PeerProcedureHandler peerProcedureHandler;

    @Override
    public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, WALFactory walFactory) throws IOException {
        WALProvider walProvider;
        this.server = server;
        this.conf = this.server.getConfiguration();
        this.isReplicationForBulkLoadDataEnabled = ReplicationUtils.isReplicationForBulkLoadDataEnabled((Configuration)this.conf);
        this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d").setDaemon(true).build());
        if (this.isReplicationForBulkLoadDataEnabled && (this.conf.get("hbase.replication.cluster.id") == null || this.conf.get("hbase.replication.cluster.id").isEmpty())) {
            throw new IllegalArgumentException("hbase.replication.cluster.id cannot be null/empty when hbase.replication.bulkload.enabled is set to true.");
        }
        try {
            this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage((ZKWatcher)server.getZooKeeper(), (Configuration)this.conf);
            this.replicationPeers = ReplicationFactory.getReplicationPeers((ZKWatcher)server.getZooKeeper(), (Configuration)this.conf);
            this.replicationPeers.init();
            this.replicationTracker = ReplicationFactory.getReplicationTracker((ZKWatcher)server.getZooKeeper(), (Abortable)this.server, (Stoppable)this.server);
        }
        catch (Exception e) {
            throw new IOException("Failed replication handler create", e);
        }
        UUID clusterId = null;
        try {
            clusterId = ZKClusterId.getUUIDForCluster((ZKWatcher)this.server.getZooKeeper());
        }
        catch (KeeperException ke) {
            throw new IOException("Could not read cluster id", ke);
        }
        this.globalMetricsSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
        this.replicationManager = new ReplicationSourceManager(this.queueStorage, this.replicationPeers, this.replicationTracker, this.conf, this.server, fs, logDir, oldLogDir, clusterId, walFactory, this.globalMetricsSource);
        WALProvider wALProvider = walProvider = walFactory != null ? walFactory.getWALProvider() : null;
        if (walProvider != null) {
            walProvider.addWALActionsListener(new ReplicationSourceWALActionListener(this.conf, this.replicationManager));
        }
        this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 300);
        LOG.debug("Replication stats-in-log period={} seconds", (Object)this.statsThreadPeriod);
        this.replicationLoad = new ReplicationLoad();
        this.peerProcedureHandler = new PeerProcedureHandlerImpl(this.replicationManager);
    }

    @Override
    public PeerProcedureHandler getPeerProcedureHandler() {
        return this.peerProcedureHandler;
    }

    @Override
    public void stopReplicationService() {
        this.join();
    }

    public void join() {
        this.replicationManager.join();
        if (this.replicationSink != null) {
            this.replicationSink.stopReplicationSinkServices();
        }
        this.scheduleThreadPool.shutdown();
    }

    @Override
    public void replicateLogEntries(List<AdminProtos.WALEntry> entries, CellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException {
        this.replicationSink.replicateEntries(entries, cells, replicationClusterId, sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
    }

    @Override
    public void startReplicationService() throws IOException {
        this.replicationManager.init();
        this.replicationSink = new ReplicationSink(this.conf);
        this.scheduleThreadPool.scheduleAtFixedRate(new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), this.statsThreadPeriod, this.statsThreadPeriod, TimeUnit.SECONDS);
        LOG.info("{} started", (Object)this.server.toString());
    }

    @Override
    public ReplicationSourceManager getReplicationManager() {
        return this.replicationManager;
    }

    void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) throws IOException {
        try {
            this.replicationManager.addHFileRefs(tableName, family, pairs);
        }
        catch (IOException e) {
            LOG.error("Failed to add hfile references in the replication queue.", (Throwable)e);
            throw e;
        }
    }

    @Override
    public ReplicationLoad refreshAndGetReplicationLoad() {
        if (this.replicationLoad == null) {
            return null;
        }
        this.buildReplicationLoad();
        return this.replicationLoad;
    }

    private void buildReplicationLoad() {
        ArrayList<ReplicationSourceInterface> allSources = new ArrayList<ReplicationSourceInterface>();
        allSources.addAll(this.replicationManager.getSources());
        allSources.addAll(this.replicationManager.getOldSources());
        MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
        this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
    }

    private static final class ReplicationStatisticsTask
    implements Runnable {
        private final ReplicationSink replicationSink;
        private final ReplicationSourceManager replicationManager;

        public ReplicationStatisticsTask(ReplicationSink replicationSink, ReplicationSourceManager replicationManager) {
            this.replicationManager = replicationManager;
            this.replicationSink = replicationSink;
        }

        @Override
        public void run() {
            this.printStats(this.replicationManager.getStats());
            this.printStats(this.replicationSink.getStats());
        }

        private void printStats(String stats) {
            if (!stats.isEmpty()) {
                LOG.info(stats);
            }
        }
    }
}

