/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.wal;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import io.hops.hudi.org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import io.hops.hudi.org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.hops.hudi.org.apache.hbase.thirdparty.io.netty.channel.Channel;
import io.hops.hudi.org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import io.hops.hudi.org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import io.hops.hudi.org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
import io.hops.hudi.org.apache.htrace.core.TraceScope;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.FSWALEntry;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.RingBufferTruck;
import org.apache.hadoop.hbase.regionserver.wal.SyncFuture;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.LimitedPrivate(value={"Configuration"})
public class AsyncFSWAL
extends AbstractFSWAL<WALProvider.AsyncWriter> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
    private static final Comparator<SyncFuture> SEQ_COMPARATOR = Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
    public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
    public static final long DEFAULT_WAL_BATCH_SIZE = 65536L;
    public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = "hbase.wal.async.use-shared-event-loop";
    public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
    public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = "hbase.wal.async.wait.on.shutdown.seconds";
    public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
    private final EventLoopGroup eventLoopGroup;
    private final ExecutorService consumeExecutor;
    private final Class<? extends Channel> channelClass;
    private final Lock consumeLock = new ReentrantLock();
    private final Runnable consumer = this::consume;
    private final Supplier<Boolean> hasConsumerTask;
    private static final int MAX_EPOCH = 0x3FFFFFFF;
    private volatile int epochAndState;
    private boolean readyForRolling;
    private final Condition readyForRollingCond = this.consumeLock.newCondition();
    private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
    private final Sequence waitingConsumePayloadsGatingSequence;
    private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
    private final long batchSize;
    private final ExecutorService closeExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
    private volatile AsyncFSOutput fsOut;
    private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<FSWALEntry>();
    private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<FSWALEntry>();
    private final SortedSet<SyncFuture> syncFutures = new TreeSet<SyncFuture>(SEQ_COMPARATOR);
    private long highestProcessedAppendTxid;
    private long fileLengthAtLastSync;
    private long highestProcessedAppendTxidAtLastSync;
    private final int waitOnShutdownInSeconds;

    public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
        this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, eventLoopGroup, channelClass);
    }

    public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
        super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
        Supplier<Boolean> hasConsumerTask;
        this.eventLoopGroup = eventLoopGroup;
        this.channelClass = channelClass;
        if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, false)) {
            this.consumeExecutor = eventLoopGroup.next();
            if (this.consumeExecutor instanceof SingleThreadEventExecutor) {
                try {
                    Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
                    field.setAccessible(true);
                    Queue queue = (Queue)field.get(this.consumeExecutor);
                    hasConsumerTask = () -> queue.peek() == this.consumer;
                }
                catch (Exception e) {
                    LOG.warn("Can not get task queue of " + this.consumeExecutor + ", this is not necessary, just give up", (Throwable)e);
                    hasConsumerTask = () -> false;
                }
            } else {
                hasConsumerTask = () -> false;
            }
        } else {
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d-" + rootDir.toString() + "-prefix:" + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true).build());
            hasConsumerTask = () -> threadPool.getQueue().peek() == this.consumer;
            this.consumeExecutor = threadPool;
        }
        this.hasConsumerTask = hasConsumerTask;
        int preallocatedEventCount = conf.getInt("hbase.regionserver.wal.disruptor.event.count", 16384);
        this.waitingConsumePayloads = RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
        this.waitingConsumePayloadsGatingSequence = new Sequence(-1L);
        this.waitingConsumePayloads.addGatingSequences(this.waitingConsumePayloadsGatingSequence);
        this.waitingConsumePayloads.publish(this.waitingConsumePayloads.next());
        this.waitingConsumePayloadsGatingSequence.set(this.waitingConsumePayloads.getCursor());
        this.batchSize = conf.getLong(WAL_BATCH_SIZE, 65536L);
        this.waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, 5);
    }

    private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {
        future.done(txid, t);
        this.syncFutureCache.offer(future);
    }

    private static boolean waitingRoll(int epochAndState) {
        return (epochAndState & 1) != 0;
    }

    private static boolean writerBroken(int epochAndState) {
        return (epochAndState >>> 1 & 1) != 0;
    }

    private static int epoch(int epochAndState) {
        return epochAndState >>> 2;
    }

    private boolean trySetReadyForRolling() {
        if (!AsyncFSWAL.waitingRoll(this.epochAndState) || !this.unackedAppends.isEmpty()) {
            return false;
        }
        this.consumeLock.lock();
        try {
            if (AsyncFSWAL.waitingRoll(this.epochAndState)) {
                this.readyForRolling = true;
                this.readyForRollingCond.signalAll();
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.consumeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncFailed(long epochWhenSync, Throwable error) {
        LOG.warn("sync failed", error);
        boolean shouldRequestLogRoll = true;
        this.consumeLock.lock();
        try {
            int currentEpochAndState = this.epochAndState;
            if ((long)AsyncFSWAL.epoch(currentEpochAndState) != epochWhenSync || AsyncFSWAL.writerBroken(currentEpochAndState)) {
                return;
            }
            this.epochAndState = currentEpochAndState | 2;
            if (AsyncFSWAL.waitingRoll(currentEpochAndState)) {
                this.readyForRolling = true;
                this.readyForRollingCond.signalAll();
                shouldRequestLogRoll = false;
            }
        }
        finally {
            this.consumeLock.unlock();
        }
        Iterator<FSWALEntry> iter = this.unackedAppends.descendingIterator();
        while (iter.hasNext()) {
            this.toWriteAppends.addFirst(iter.next());
        }
        this.highestUnsyncedTxid = this.highestSyncedTxid.get();
        if (shouldRequestLogRoll) {
            this.requestLogRoll(WALActionsListener.RollRequestReason.ERROR);
        }
    }

    private void syncCompleted(long epochWhenSync, WALProvider.AsyncWriter writer, long processedTxid, long startTimeNs) {
        FSWALEntry entry;
        int epochAndState = this.epochAndState;
        if ((long)AsyncFSWAL.epoch(epochAndState) != epochWhenSync || AsyncFSWAL.writerBroken(epochAndState)) {
            LOG.warn("Got a sync complete call after the writer is broken, skip");
            return;
        }
        this.highestSyncedTxid.set(processedTxid);
        Iterator<FSWALEntry> iter = this.unackedAppends.iterator();
        while (iter.hasNext() && (entry = iter.next()).getTxid() <= processedTxid) {
            entry.release();
            iter.remove();
        }
        this.postSync(System.nanoTime() - startTimeNs, this.finishSync(true));
        if (this.trySetReadyForRolling()) {
            return;
        }
        if (!this.isLogRollRequested() && writer.getLength() > this.logrollsize) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength() + ", logrollsize=" + this.logrollsize);
            }
            this.requestLogRoll(WALActionsListener.RollRequestReason.SIZE);
        }
    }

    private boolean isHsync(long beginTxid, long endTxid) {
        SortedSet<SyncFuture> futures = this.syncFutures.subSet(new SyncFuture().reset(beginTxid, false), new SyncFuture().reset(endTxid + 1L, false));
        if (futures.isEmpty()) {
            return this.useHsync;
        }
        for (SyncFuture future : futures) {
            if (!future.isForceSync()) continue;
            return true;
        }
        return false;
    }

    private void sync(WALProvider.AsyncWriter writer) {
        this.fileLengthAtLastSync = writer.getLength();
        long currentHighestProcessedAppendTxid = this.highestProcessedAppendTxid;
        boolean shouldUseHsync = this.isHsync(this.highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
        this.highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
        long startTimeNs = System.nanoTime();
        long epoch = (long)this.epochAndState >>> 2;
        FutureUtils.addListener(writer.sync(shouldUseHsync), (result, error) -> {
            if (error != null) {
                this.syncFailed(epoch, (Throwable)error);
            } else {
                this.syncCompleted(epoch, writer, currentHighestProcessedAppendTxid, startTimeNs);
            }
        }, this.consumeExecutor);
    }

    private void addTimeAnnotation(SyncFuture future, String annotation) {
        TraceUtil.addTimelineAnnotation(annotation);
    }

    private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
        SyncFuture sync;
        int finished = 0;
        Iterator iter = this.syncFutures.iterator();
        while (iter.hasNext() && (sync = (SyncFuture)iter.next()).getTxid() <= txid) {
            this.markFutureDoneAndOffer(sync, txid, null);
            iter.remove();
            ++finished;
            if (!addSyncTrace) continue;
            this.addTimeAnnotation(sync, "writer synced");
        }
        return finished;
    }

    private int finishSync(boolean addSyncTrace) {
        if (this.unackedAppends.isEmpty()) {
            if (this.toWriteAppends.isEmpty()) {
                long maxSyncTxid = this.highestSyncedTxid.get();
                for (SyncFuture sync : this.syncFutures) {
                    maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
                    this.markFutureDoneAndOffer(sync, maxSyncTxid, null);
                    if (!addSyncTrace) continue;
                    this.addTimeAnnotation(sync, "writer synced");
                }
                this.highestSyncedTxid.set(maxSyncTxid);
                int finished = this.syncFutures.size();
                this.syncFutures.clear();
                return finished;
            }
            long lowestUnprocessedAppendTxid = this.toWriteAppends.peek().getTxid();
            assert (lowestUnprocessedAppendTxid > this.highestProcessedAppendTxid);
            long doneTxid = lowestUnprocessedAppendTxid - 1L;
            this.highestSyncedTxid.set(doneTxid);
            return this.finishSyncLowerThanTxid(doneTxid, addSyncTrace);
        }
        long lowestUnackedAppendTxid = this.unackedAppends.peek().getTxid();
        long doneTxid = Math.max(lowestUnackedAppendTxid - 1L, this.highestSyncedTxid.get());
        this.highestSyncedTxid.set(doneTxid);
        return this.finishSyncLowerThanTxid(doneTxid, addSyncTrace);
    }

    private static long getLastTxid(Deque<FSWALEntry> queue) {
        return queue.peekLast().getTxid();
    }

    private void appendAndSync() {
        WALProvider.AsyncWriter writer = (WALProvider.AsyncWriter)this.writer;
        this.finishSync(false);
        long newHighestProcessedAppendTxid = -1L;
        boolean addedToUnackedAppends = false;
        Iterator<FSWALEntry> iter = this.toWriteAppends.iterator();
        while (iter.hasNext()) {
            boolean appended;
            FSWALEntry entry = iter.next();
            try {
                appended = this.appendEntry(writer, entry);
            }
            catch (IOException e) {
                throw new AssertionError("should not happen", e);
            }
            newHighestProcessedAppendTxid = entry.getTxid();
            iter.remove();
            if (!appended) continue;
            if (addedToUnackedAppends || this.unackedAppends.isEmpty() || AsyncFSWAL.getLastTxid(this.unackedAppends) < entry.getTxid()) {
                this.unackedAppends.addLast(entry);
                addedToUnackedAppends = true;
            }
            if (writer.getLength() - this.fileLengthAtLastSync < this.batchSize || !addedToUnackedAppends && entry.getTxid() < AsyncFSWAL.getLastTxid(this.unackedAppends)) continue;
            break;
        }
        if (newHighestProcessedAppendTxid > 0L) {
            this.highestProcessedAppendTxid = newHighestProcessedAppendTxid;
        } else {
            newHighestProcessedAppendTxid = this.highestProcessedAppendTxid;
        }
        if (writer.getLength() - this.fileLengthAtLastSync >= this.batchSize) {
            this.sync(writer);
            return;
        }
        if (writer.getLength() == this.fileLengthAtLastSync) {
            if (this.unackedAppends.isEmpty()) {
                this.highestSyncedTxid.set(this.highestProcessedAppendTxid);
                this.finishSync(false);
                this.trySetReadyForRolling();
            }
            return;
        }
    }

    private void consume() {
        this.consumeLock.lock();
        try {
            int currentEpochAndState = this.epochAndState;
            if (AsyncFSWAL.writerBroken(currentEpochAndState)) {
                return;
            }
            if (AsyncFSWAL.waitingRoll(currentEpochAndState)) {
                if (((WALProvider.AsyncWriter)this.writer).getLength() > this.fileLengthAtLastSync) {
                    this.sync((WALProvider.AsyncWriter)this.writer);
                } else if (this.unackedAppends.isEmpty()) {
                    this.readyForRolling = true;
                    this.readyForRollingCond.signalAll();
                }
                return;
            }
        }
        finally {
            this.consumeLock.unlock();
        }
        long cursorBound = this.waitingConsumePayloads.getCursor();
        for (long nextCursor = this.waitingConsumePayloadsGatingSequence.get() + 1L; nextCursor <= cursorBound && this.waitingConsumePayloads.isPublished(nextCursor); ++nextCursor) {
            RingBufferTruck truck = this.waitingConsumePayloads.get(nextCursor);
            switch (truck.type()) {
                case APPEND: {
                    this.toWriteAppends.addLast(truck.unloadAppend());
                    break;
                }
                case SYNC: {
                    this.syncFutures.add(truck.unloadSync());
                    break;
                }
                default: {
                    LOG.warn("RingBufferTruck with unexpected type: " + (Object)((Object)truck.type()));
                }
            }
            this.waitingConsumePayloadsGatingSequence.set(nextCursor);
        }
        this.appendAndSync();
        if (this.hasConsumerTask.get().booleanValue()) {
            return;
        }
        if (this.toWriteAppends.isEmpty() && this.waitingConsumePayloadsGatingSequence.get() == this.waitingConsumePayloads.getCursor()) {
            this.consumerScheduled.set(false);
            if (this.waitingConsumePayloadsGatingSequence.get() == this.waitingConsumePayloads.getCursor()) {
                if (((WALProvider.AsyncWriter)this.writer).getLength() > this.fileLengthAtLastSync && !this.syncFutures.isEmpty() && this.syncFutures.last().getTxid() > this.highestProcessedAppendTxidAtLastSync) {
                    this.sync((WALProvider.AsyncWriter)this.writer);
                }
                return;
            }
            if (!this.consumerScheduled.compareAndSet(false, true)) {
                return;
            }
        }
        this.consumeExecutor.execute(this.consumer);
    }

    private boolean shouldScheduleConsumer() {
        int currentEpochAndState = this.epochAndState;
        if (AsyncFSWAL.writerBroken(currentEpochAndState) || AsyncFSWAL.waitingRoll(currentEpochAndState)) {
            return false;
        }
        return this.consumerScheduled.compareAndSet(false, true);
    }

    @Override
    protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException {
        long txid = this.stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, this.waitingConsumePayloads);
        if (this.shouldScheduleConsumer()) {
            this.consumeExecutor.execute(this.consumer);
        }
        return txid;
    }

    @Override
    public void sync() throws IOException {
        this.sync(this.useHsync);
    }

    @Override
    public void sync(long txid) throws IOException {
        this.sync(txid, this.useHsync);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sync(boolean forceSync) throws IOException {
        try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync");){
            SyncFuture future;
            long txid = this.waitingConsumePayloads.next();
            try {
                future = this.getSyncFuture(txid, forceSync);
                RingBufferTruck truck = this.waitingConsumePayloads.get(txid);
                truck.load(future);
            }
            finally {
                this.waitingConsumePayloads.publish(txid);
            }
            if (this.shouldScheduleConsumer()) {
                this.consumeExecutor.execute(this.consumer);
            }
            this.blockOnSync(future);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sync(long txid, boolean forceSync) throws IOException {
        if (this.highestSyncedTxid.get() >= txid) {
            return;
        }
        try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync");){
            SyncFuture future;
            long sequence = this.waitingConsumePayloads.next();
            try {
                future = this.getSyncFuture(txid, forceSync);
                RingBufferTruck truck = this.waitingConsumePayloads.get(sequence);
                truck.load(future);
            }
            finally {
                this.waitingConsumePayloads.publish(sequence);
            }
            if (this.shouldScheduleConsumer()) {
                this.consumeExecutor.execute(this.consumer);
            }
            this.blockOnSync(future);
        }
    }

    @Override
    protected WALProvider.AsyncWriter createWriterInstance(Path path) throws IOException {
        return AsyncFSWALProvider.createAsyncWriter(this.conf, this.fs, path, false, this.blocksize, this.eventLoopGroup, this.channelClass);
    }

    private void waitForSafePoint() {
        this.consumeLock.lock();
        try {
            int currentEpochAndState = this.epochAndState;
            if (AsyncFSWAL.writerBroken(currentEpochAndState) || this.writer == null) {
                return;
            }
            this.consumerScheduled.set(true);
            this.epochAndState = currentEpochAndState | 1;
            this.readyForRolling = false;
            this.consumeExecutor.execute(this.consumer);
            while (!this.readyForRolling) {
                this.readyForRollingCond.awaitUninterruptibly();
            }
        }
        finally {
            this.consumeLock.unlock();
        }
    }

    protected final long closeWriter(WALProvider.AsyncWriter writer, Path path) {
        if (writer != null) {
            this.inflightWALClosures.put(path.getName(), writer);
            long fileLength = writer.getLength();
            this.closeExecutor.execute(() -> {
                try {
                    writer.close();
                }
                catch (IOException e) {
                    LOG.warn("close old writer failed", (Throwable)e);
                }
                finally {
                    this.inflightWALClosures.remove(path.getName());
                }
            });
            return fileLength;
        }
        return 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doReplaceWriter(Path oldPath, Path newPath, WALProvider.AsyncWriter nextWriter) throws IOException {
        Preconditions.checkNotNull(nextWriter);
        this.waitForSafePoint();
        long oldFileLen = this.closeWriter((WALProvider.AsyncWriter)this.writer, oldPath);
        this.logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
        this.writer = nextWriter;
        if (nextWriter instanceof AsyncProtobufLogWriter) {
            this.fsOut = ((AsyncProtobufLogWriter)nextWriter).getOutput();
        }
        this.fileLengthAtLastSync = nextWriter.getLength();
        this.highestProcessedAppendTxidAtLastSync = 0L;
        this.consumeLock.lock();
        try {
            this.consumerScheduled.set(true);
            int currentEpoch = this.epochAndState >>> 2;
            int nextEpoch = currentEpoch == 0x3FFFFFFF ? 0 : currentEpoch + 1;
            this.epochAndState = nextEpoch << 2;
            this.rollRequested.set(false);
            this.consumeExecutor.execute(this.consumer);
        }
        finally {
            this.consumeLock.unlock();
        }
    }

    @Override
    protected void doShutdown() throws IOException {
        this.waitForSafePoint();
        this.closeWriter((WALProvider.AsyncWriter)this.writer, this.getOldPath());
        this.writer = null;
        this.closeExecutor.shutdown();
        try {
            if (!this.closeExecutor.awaitTermination(this.waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
                LOG.error("We have waited " + this.waitOnShutdownInSeconds + " seconds but the close of async writer doesn't complete.Please check the status of underlying filesystem or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + "\"");
            }
        }
        catch (InterruptedException e) {
            LOG.error("The wait for close of async writer is interrupted");
            Thread.currentThread().interrupt();
        }
        IOException error = new IOException("WAL has been closed");
        long cursorBound = this.waitingConsumePayloads.getCursor();
        block5: for (long nextCursor = this.waitingConsumePayloadsGatingSequence.get() + 1L; nextCursor <= cursorBound && this.waitingConsumePayloads.isPublished(nextCursor); ++nextCursor) {
            RingBufferTruck truck = this.waitingConsumePayloads.get(nextCursor);
            switch (truck.type()) {
                case SYNC: {
                    this.syncFutures.add(truck.unloadSync());
                    continue block5;
                }
            }
        }
        this.syncFutures.forEach(f -> this.markFutureDoneAndOffer((SyncFuture)f, f.getTxid(), error));
        if (!(this.consumeExecutor instanceof EventLoop)) {
            this.consumeExecutor.shutdown();
        }
    }

    @Override
    protected void doAppend(WALProvider.AsyncWriter writer, FSWALEntry entry) {
        writer.append(entry);
    }

    @Override
    DatanodeInfo[] getPipeline() {
        AsyncFSOutput output = this.fsOut;
        return output != null ? output.getPipeline() : new DatanodeInfo[]{};
    }

    @Override
    int getLogReplication() {
        return this.getPipeline().length;
    }

    @Override
    protected boolean doCheckLogLowReplication() {
        AsyncFSOutput output = this.fsOut;
        return output != null && output.isBroken();
    }
}

