/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALEditsReplay;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hudi.com.google.protobuf.ServiceException;

@InterfaceAudience.Private
public class WALEditsReplaySink {
    private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
    private static final int MAX_BATCH_SIZE = 1024;
    private final Configuration conf;
    private final HConnection conn;
    private final TableName tableName;
    private final MetricsWALEditsReplay metrics;
    private final AtomicLong totalReplayedEdits = new AtomicLong();
    private final boolean skipErrors;
    private final int replayTimeout;
    private RpcControllerFactory rpcControllerFactory;

    public WALEditsReplaySink(Configuration conf, TableName tableName, HConnection conn) throws IOException {
        this.conf = conf;
        this.metrics = new MetricsWALEditsReplay();
        this.conn = conn;
        this.tableName = tableName;
        this.skipErrors = conf.getBoolean("hbase.hregion.edits.replay.skip.errors", false);
        this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
        this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
    }

    public void replayEntries(List<Pair<HRegionLocation, WAL.Entry>> entries) throws IOException {
        if (entries.size() == 0) {
            return;
        }
        int batchSize = entries.size();
        HashMap entriesByRegion = new HashMap();
        HRegionLocation loc = null;
        WAL.Entry entry = null;
        List<WAL.Entry> regionEntries = null;
        for (int i = 0; i < batchSize; ++i) {
            loc = entries.get(i).getFirst();
            entry = entries.get(i).getSecond();
            if (entriesByRegion.containsKey(loc.getRegionInfo())) {
                regionEntries = (List)entriesByRegion.get(loc.getRegionInfo());
            } else {
                regionEntries = new ArrayList();
                entriesByRegion.put(loc.getRegionInfo(), regionEntries);
            }
            regionEntries.add(entry);
        }
        long startTime = EnvironmentEdgeManager.currentTime();
        for (Map.Entry _entry : entriesByRegion.entrySet()) {
            HRegionInfo curRegion = (HRegionInfo)_entry.getKey();
            List allActions = (List)_entry.getValue();
            int totalActions = allActions.size();
            int curBatchSize = 0;
            for (int replayedActions = 0; replayedActions < totalActions; replayedActions += curBatchSize) {
                curBatchSize = totalActions > 1024 + replayedActions ? 1024 : totalActions - replayedActions;
                this.replayEdits(loc, curRegion, allActions.subList(replayedActions, replayedActions + curBatchSize));
            }
        }
        long endTime = EnvironmentEdgeManager.currentTime() - startTime;
        LOG.debug((Object)("number of rows:" + entries.size() + " are sent by batch! spent " + endTime + "(ms)!"));
        this.metrics.updateReplayTime(endTime);
        this.metrics.updateReplayBatchSize(batchSize);
        this.totalReplayedEdits.addAndGet(batchSize);
    }

    public String getStats() {
        return this.totalReplayedEdits.get() == 0L ? "" : "Sink: total replayed edits: " + this.totalReplayedEdits;
    }

    private void replayEdits(HRegionLocation regionLoc, HRegionInfo regionInfo, List<WAL.Entry> entries) throws IOException {
        try {
            RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(this.conf, null);
            ReplayServerCallable callable = new ReplayServerCallable(this.conn, this.tableName, regionLoc, regionInfo, entries);
            factory.newCaller().callWithRetries(callable, this.replayTimeout);
        }
        catch (IOException ie) {
            if (this.skipErrors) {
                LOG.warn((Object)("hbase.hregion.edits.replay.skip.errors=true so continuing replayEdits with error:" + ie.getMessage()));
            }
            throw ie;
        }
    }

    class ReplayServerCallable<R>
    extends RegionServerCallable<AdminProtos.ReplicateWALEntryResponse> {
        private HRegionInfo regionInfo;
        private List<WAL.Entry> entries;

        ReplayServerCallable(HConnection connection, TableName tableName, HRegionLocation regionLoc, HRegionInfo regionInfo, List<WAL.Entry> entries) {
            super(connection, tableName, null);
            this.entries = entries;
            this.regionInfo = regionInfo;
            this.setLocation(regionLoc);
        }

        @Override
        public AdminProtos.ReplicateWALEntryResponse call(int callTimeout) throws IOException {
            try {
                this.replayToServer(this.regionInfo, this.entries);
            }
            catch (ServiceException se) {
                throw ProtobufUtil.getRemoteException(se);
            }
            return null;
        }

        private void replayToServer(HRegionInfo regionInfo, List<WAL.Entry> entries) throws IOException, ServiceException {
            if (entries.isEmpty()) {
                return;
            }
            WAL.Entry[] entriesArray = new WAL.Entry[entries.size()];
            entriesArray = entries.toArray(entriesArray);
            AdminProtos.AdminService.BlockingInterface remoteSvr = WALEditsReplaySink.this.conn.getAdmin(this.getLocation().getServerName());
            Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
            PayloadCarryingRpcController controller = WALEditsReplaySink.this.rpcControllerFactory.newController(p.getSecond());
            try {
                remoteSvr.replay(controller, p.getFirst());
            }
            catch (ServiceException se) {
                throw ProtobufUtil.getRemoteException(se);
            }
        }

        @Override
        public void prepare(boolean reload) throws IOException {
            if (!reload) {
                return;
            }
            boolean skip = false;
            for (WAL.Entry entry : this.entries) {
                WALEdit edit = entry.getEdit();
                ArrayList<Cell> cells = edit.getCells();
                Iterator i$ = cells.iterator();
                if (i$.hasNext()) {
                    Cell cell = (Cell)i$.next();
                    this.setLocation(WALEditsReplaySink.this.conn.locateRegion(this.tableName, cell.getRow()));
                    skip = true;
                }
                if (!skip) continue;
                break;
            }
        }
    }
}

