package io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver;

import io.hops.hudi.org.apache.hadoop.hbase.ServerName;
import io.hops.hudi.org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import io.hops.hudi.org.apache.hadoop.hbase.util.CancelableProgressable;
import io.hops.hudi.org.apache.hadoop.hbase.util.CommonFSUtils;
import io.hops.hudi.org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import io.hops.hudi.org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import io.hops.hudi.org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import io.hops.hudi.org.apache.hadoop.hbase.wal.WAL;
import io.hops.hudi.org.apache.hadoop.hbase.wal.WALFactory;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: input_file:io/hops/hudi/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.class */
public class WALEntryStream implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
    private WAL.Reader reader;
    private Path currentPath;
    private WAL.Entry currentEntry;
    private long currentPositionOfEntry;
    private long currentPositionOfReader = 0;
    private final ReplicationSourceLogQueue logQueue;
    private final String walGroupId;
    private final FileSystem fs;
    private final Configuration conf;
    private final WALFileLengthProvider walFileLengthProvider;
    private final ServerName serverName;
    private final MetricsSource metrics;

    public WALEntryStream(ReplicationSourceLogQueue replicationSourceLogQueue, Configuration configuration, long j, WALFileLengthProvider wALFileLengthProvider, ServerName serverName, MetricsSource metricsSource, String str) throws IOException {
        this.currentPositionOfEntry = 0L;
        this.logQueue = replicationSourceLogQueue;
        this.fs = CommonFSUtils.getWALFileSystem(configuration);
        this.conf = configuration;
        this.currentPositionOfEntry = j;
        this.walFileLengthProvider = wALFileLengthProvider;
        this.serverName = serverName;
        this.metrics = metricsSource;
        this.walGroupId = str;
    }

    public boolean hasNext() throws IOException {
        if (this.currentEntry == null) {
            tryAdvanceEntry();
        }
        return this.currentEntry != null;
    }

    public WAL.Entry peek() throws IOException {
        if (hasNext()) {
            return this.currentEntry;
        }
        return null;
    }

    public WAL.Entry next() throws IOException {
        WAL.Entry peek = peek();
        this.currentPositionOfEntry = this.currentPositionOfReader;
        this.currentEntry = null;
        return peek;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        closeReader();
    }

    public long getPosition() {
        return this.currentPositionOfEntry;
    }

    public Path getCurrentPath() {
        return this.currentPath;
    }

    private String getCurrentPathStat() {
        StringBuilder sb = new StringBuilder();
        if (this.currentPath != null) {
            sb.append("currently replicating from: ").append(this.currentPath).append(" at position: ").append(this.currentPositionOfEntry).append("\n");
        } else {
            sb.append("no replication ongoing, waiting for new log");
        }
        return sb.toString();
    }

    public void reset() throws IOException {
        if (this.reader == null || this.currentPath == null) {
            return;
        }
        resetReader();
    }

    private void setPosition(long j) {
        this.currentPositionOfEntry = j;
    }

    private void setCurrentPath(Path path) {
        this.currentPath = path;
    }

    private void tryAdvanceEntry() throws IOException {
        if (checkReader()) {
            boolean readNextEntryAndRecordReaderPosition = readNextEntryAndRecordReaderPosition();
            LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, Boolean.valueOf(readNextEntryAndRecordReaderPosition));
            if (this.currentEntry != null || readNextEntryAndRecordReaderPosition) {
                return;
            }
            resetReader();
            readNextEntryAndRecordReaderPosition();
            if (this.currentEntry == null && checkAllBytesParsed()) {
                dequeueCurrentLog();
                if (openNextLog()) {
                    readNextEntryAndRecordReaderPosition();
                }
            }
        }
    }

    private boolean checkAllBytesParsed() throws IOException {
        long currentTrailerSize = currentTrailerSize();
        FileStatus fileStatus = null;
        try {
            fileStatus = this.fs.getFileStatus(this.currentPath);
        } catch (IOException e) {
            Logger logger = LOG;
            Object[] objArr = new Object[3];
            objArr[0] = this.currentPath;
            objArr[1] = currentTrailerSize < 0 ? "was not" : "was";
            objArr[2] = getCurrentPathStat();
            logger.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", objArr);
            this.metrics.incrUnknownFileLengthForClosedWAL();
        }
        if (fileStatus != null) {
            if (currentTrailerSize < 0) {
                if (this.currentPositionOfReader < fileStatus.getLen()) {
                    long len = fileStatus.getLen() - this.currentPositionOfReader;
                    LOG.warn("Reached the end of WAL {}. It was not closed cleanly, so we did not parse {} bytes of data.", this.currentPath, Long.valueOf(len));
                    this.metrics.incrUncleanlyClosedWALs();
                    this.metrics.incrBytesSkippedInUncleanlyClosedWALs(len);
                }
            } else if (this.currentPositionOfReader + currentTrailerSize < fileStatus.getLen()) {
                LOG.warn("Processing end of WAL {} at position {}, which is too far away from reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", new Object[]{this.currentPath, Long.valueOf(this.currentPositionOfReader), Long.valueOf(fileStatus.getLen()), getCurrentPathStat()});
                setPosition(0L);
                resetReader();
                this.metrics.incrRestartedWALReading();
                this.metrics.incrRepeatedFileBytes(this.currentPositionOfReader);
                return false;
            }
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Reached the end of " + this.currentPath + " and length of the file is " + (fileStatus == null ? "N/A" : Long.valueOf(fileStatus.getLen())));
        }
        this.metrics.incrCompletedWAL();
        return true;
    }

    private void dequeueCurrentLog() throws IOException {
        LOG.debug("EOF, closing {}", this.currentPath);
        closeReader();
        this.logQueue.remove(this.walGroupId);
        setCurrentPath(null);
        setPosition(0L);
    }

    private boolean readNextEntryAndRecordReaderPosition() throws IOException {
        WAL.Entry next = this.reader.next();
        long position = this.reader.getPosition();
        OptionalLong logFileSizeIfBeingWritten = this.walFileLengthProvider.getLogFileSizeIfBeingWritten(this.currentPath);
        if (logFileSizeIfBeingWritten.isPresent() && position > logFileSizeIfBeingWritten.getAsLong()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("The provider tells us the valid length for " + this.currentPath + " is " + logFileSizeIfBeingWritten.getAsLong() + ", but we have advanced to " + position);
            }
            resetReader();
            return true;
        }
        if (next != null) {
            LOG.trace("reading entry: {} ", next);
            this.metrics.incrLogEditsRead();
            this.metrics.incrLogReadInBytes(position - this.currentPositionOfEntry);
        }
        this.currentEntry = next;
        this.currentPositionOfReader = position;
        return logFileSizeIfBeingWritten.isPresent();
    }

    private void closeReader() throws IOException {
        if (this.reader != null) {
            this.reader.close();
            this.reader = null;
        }
    }

    private boolean checkReader() throws IOException {
        if (this.reader == null) {
            return openNextLog();
        }
        return true;
    }

    private boolean openNextLog() throws IOException {
        Path peek = this.logQueue.getQueue(this.walGroupId).peek();
        if (peek != null) {
            openReader(peek);
            return this.reader != null;
        }
        setCurrentPath(null);
        return false;
    }

    private void handleFileNotFound(Path path, FileNotFoundException fileNotFoundException) throws IOException {
        Path findArchivedLog = AbstractFSWALProvider.findArchivedLog(path, this.conf);
        if (findArchivedLog == null) {
            throw fileNotFoundException;
        }
        openReader(findArchivedLog);
    }

    private void openReader(Path path) throws IOException {
        try {
            if (this.reader == null || !getCurrentPath().equals(path)) {
                closeReader();
                this.reader = WALFactory.createReader(this.fs, path, this.conf);
                seek();
                setCurrentPath(path);
            } else {
                resetReader();
            }
        } catch (LeaseNotRecoveredException e) {
            LOG.warn("Try to recover the WAL lease " + path, e);
            recoverLease(this.conf, path);
            this.reader = null;
        } catch (FileNotFoundException e2) {
            handleFileNotFound(path, e2);
        } catch (RemoteException e3) {
            IOException unwrapRemoteException = e3.unwrapRemoteException(new Class[]{FileNotFoundException.class});
            if (!(unwrapRemoteException instanceof FileNotFoundException)) {
                throw unwrapRemoteException;
            }
            handleFileNotFound(path, (FileNotFoundException) unwrapRemoteException);
        } catch (NullPointerException e4) {
            LOG.warn("Got NPE opening reader, will retry.");
            this.reader = null;
        }
    }

    private void recoverLease(Configuration configuration, final Path path) {
        try {
            RecoverLeaseFSUtils.recoverFileLease(CommonFSUtils.getWALFileSystem(configuration), path, configuration, new CancelableProgressable() { // from class: io.hops.hudi.org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.1
                public boolean progress() {
                    WALEntryStream.LOG.debug("recover WAL lease: " + path);
                    return true;
                }
            });
        } catch (IOException e) {
            LOG.warn("unable to recover lease for WAL: " + path, e);
        }
    }

    private void resetReader() throws IOException {
        try {
            this.currentEntry = null;
            this.reader.reset();
            seek();
        } catch (FileNotFoundException e) {
            Path findArchivedLog = AbstractFSWALProvider.findArchivedLog(this.currentPath, this.conf);
            if (findArchivedLog == null) {
                throw e;
            }
            openReader(findArchivedLog);
        } catch (NullPointerException e2) {
            throw new IOException("NPE resetting reader, likely HDFS-4380", e2);
        }
    }

    private void seek() throws IOException {
        if (this.currentPositionOfEntry != 0) {
            this.reader.seek(this.currentPositionOfEntry);
        }
    }

    private long currentTrailerSize() {
        long j = -1;
        if (this.reader instanceof ProtobufLogReader) {
            j = ((ProtobufLogReader) this.reader).trailerSize();
        }
        return j;
    }
}
