package org.apache.hudi.common.table.log;

import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.common.fs.TimedFSDataInputStream;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/common/table/log/HoodieLogFileReader.class */
public class HoodieLogFileReader implements HoodieLogFormat.Reader {
    public static final int DEFAULT_BUFFER_SIZE = 16777216;
    private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1048576;
    private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
    private final FSDataInputStream inputStream;
    private final HoodieLogFile logFile;
    private final byte[] magicBuffer;
    private final Schema readerSchema;
    private boolean readBlockLazily;
    private long reverseLogFilePosition;
    private long lastReverseLogFilePosition;
    private boolean reverseReader;
    private boolean closed;
    private transient Thread shutdownThread;

    public HoodieLogFileReader(FileSystem fileSystem, HoodieLogFile hoodieLogFile, Schema schema, int i, boolean z, boolean z2) throws IOException {
        this.magicBuffer = new byte[6];
        this.closed = false;
        this.shutdownThread = null;
        FSDataInputStream open = fileSystem.open(hoodieLogFile.getPath(), i);
        this.logFile = hoodieLogFile;
        this.inputStream = getFSDataInputStream(open, fileSystem, i);
        this.readerSchema = schema;
        this.readBlockLazily = z;
        this.reverseReader = z2;
        if (this.reverseReader) {
            long len = fileSystem.getFileStatus(hoodieLogFile.getPath()).getLen();
            this.lastReverseLogFilePosition = len;
            this.reverseLogFilePosition = len;
        }
        addShutDownHook();
    }

    public HoodieLogFileReader(FileSystem fileSystem, HoodieLogFile hoodieLogFile, Schema schema, boolean z, boolean z2) throws IOException {
        this(fileSystem, hoodieLogFile, schema, 16777216, z, z2);
    }

    public HoodieLogFileReader(FileSystem fileSystem, HoodieLogFile hoodieLogFile, Schema schema) throws IOException {
        this(fileSystem, hoodieLogFile, schema, 16777216, false, false);
    }

    private FSDataInputStream getFSDataInputStream(FSDataInputStream fSDataInputStream, FileSystem fileSystem, int i) {
        return FSUtils.isGCSFileSystem(fileSystem) ? new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fSDataInputStream, i), true) : fSDataInputStream.getWrappedStream() instanceof FSInputStream ? new TimedFSDataInputStream(this.logFile.getPath(), new FSDataInputStream(new BufferedFSInputStream(fSDataInputStream.getWrappedStream(), i))) : fSDataInputStream;
    }

    private FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fSDataInputStream, int i) {
        if (fSDataInputStream.getWrappedStream() instanceof FSInputStream) {
            return new TimedFSDataInputStream(this.logFile.getPath(), new FSDataInputStream(new BufferedFSInputStream(fSDataInputStream.getWrappedStream(), i)));
        }
        if (!(fSDataInputStream.getWrappedStream() instanceof FSDataInputStream) || !(fSDataInputStream.getWrappedStream().getWrappedStream() instanceof FSInputStream)) {
            return fSDataInputStream;
        }
        return new TimedFSDataInputStream(this.logFile.getPath(), new FSDataInputStream(new BufferedFSInputStream(fSDataInputStream.getWrappedStream().getWrappedStream(), i)));
    }

    @Override // org.apache.hudi.common.table.log.HoodieLogFormat.Reader
    public HoodieLogFile getLogFile() {
        return this.logFile;
    }

    private void addShutDownHook() {
        this.shutdownThread = new Thread(() -> {
            try {
                close();
            } catch (Exception e) {
                LOG.warn("unable to close input stream for log file " + this.logFile, e);
            }
        });
        Runtime.getRuntime().addShutdownHook(this.shutdownThread);
    }

    private HoodieLogBlock readBlock() throws IOException {
        HoodieLogBlock.HoodieLogBlockType hoodieLogBlockType = null;
        Map<HoodieLogBlock.HeaderMetadataType, String> map = null;
        try {
            int readLong = (int) this.inputStream.readLong();
            if (isBlockCorrupt(readLong)) {
                return createCorruptBlock();
            }
            HoodieLogFormat.LogFormatVersion readVersion = readVersion();
            if (readVersion.getVersion() != 0) {
                int readInt = this.inputStream.readInt();
                ValidationUtils.checkArgument(readInt < HoodieLogBlock.HoodieLogBlockType.values().length, "Invalid block byte type found " + readInt);
                hoodieLogBlockType = HoodieLogBlock.HoodieLogBlockType.values()[readInt];
            }
            if (readVersion.hasHeader()) {
                map = HoodieLogBlock.getLogMetadata(this.inputStream);
            }
            int i = readLong;
            if (readVersion.getVersion() != 0) {
                i = (int) this.inputStream.readLong();
            }
            long pos = this.inputStream.getPos();
            byte[] readOrSkipContent = HoodieLogBlock.readOrSkipContent(this.inputStream, Integer.valueOf(i), this.readBlockLazily);
            Map<HoodieLogBlock.HeaderMetadataType, String> map2 = null;
            if (readVersion.hasFooter()) {
                map2 = HoodieLogBlock.getLogMetadata(this.inputStream);
            }
            if (readVersion.hasLogBlockLength()) {
                this.inputStream.readLong();
            }
            long pos2 = this.inputStream.getPos();
            switch ((HoodieLogBlock.HoodieLogBlockType) Objects.requireNonNull(hoodieLogBlockType)) {
                case AVRO_DATA_BLOCK:
                    return readVersion.getVersion() == 0 ? HoodieAvroDataBlock.getBlock(readOrSkipContent, this.readerSchema) : new HoodieAvroDataBlock(this.logFile, this.inputStream, Option.ofNullable(readOrSkipContent), this.readBlockLazily, pos, i, pos2, this.readerSchema, map, map2);
                case HFILE_DATA_BLOCK:
                    return new HoodieHFileDataBlock(this.logFile, this.inputStream, Option.ofNullable(readOrSkipContent), this.readBlockLazily, pos, i, pos2, this.readerSchema, map, map2);
                case DELETE_BLOCK:
                    return HoodieDeleteBlock.getBlock(this.logFile, this.inputStream, Option.ofNullable(readOrSkipContent), this.readBlockLazily, pos, i, pos2, map, map2);
                case COMMAND_BLOCK:
                    return HoodieCommandBlock.getBlock(this.logFile, this.inputStream, Option.ofNullable(readOrSkipContent), this.readBlockLazily, pos, i, pos2, map, map2);
                default:
                    throw new HoodieNotSupportedException("Unsupported Block " + hoodieLogBlockType);
            }
        } catch (EOFException | CorruptedLogFileException e) {
            return createCorruptBlock();
        }
    }

    private HoodieLogBlock createCorruptBlock() throws IOException {
        LOG.info("Log " + this.logFile + " has a corrupted block at " + this.inputStream.getPos());
        long pos = this.inputStream.getPos();
        long scanForNextAvailableBlockOffset = scanForNextAvailableBlockOffset();
        this.inputStream.seek(pos);
        LOG.info("Next available block in " + this.logFile + " starts at " + scanForNextAvailableBlockOffset);
        int i = (int) (scanForNextAvailableBlockOffset - pos);
        return HoodieCorruptBlock.getBlock(this.logFile, this.inputStream, Option.ofNullable(HoodieLogBlock.readOrSkipContent(this.inputStream, Integer.valueOf(i), this.readBlockLazily)), this.readBlockLazily, this.inputStream.getPos(), i, i, new HashMap(), new HashMap());
    }

    private boolean isBlockCorrupt(int i) throws IOException {
        long pos = this.inputStream.getPos();
        try {
            this.inputStream.seek(pos + i);
            this.inputStream.seek(this.inputStream.getPos() - 8);
            long readLong = this.inputStream.readLong() - this.magicBuffer.length;
            try {
                if (i != readLong) {
                    LOG.info("Found corrupted block in file " + this.logFile + ". Header block size(" + i + ") did not match the footer block size(" + readLong + ")");
                    this.inputStream.seek(pos);
                    return true;
                }
                try {
                    readMagic();
                    this.inputStream.seek(pos);
                    return false;
                } catch (CorruptedLogFileException e) {
                    LOG.info("Found corrupted block in file " + this.logFile + ". No magic hash found right after footer block size entry");
                    this.inputStream.seek(pos);
                    return true;
                }
            } catch (Throwable th) {
                this.inputStream.seek(pos);
                throw th;
            }
        } catch (EOFException e2) {
            LOG.info("Found corrupted block in file " + this.logFile + " with block size(" + i + ") running past EOF");
            this.inputStream.seek(pos);
            return true;
        }
    }

    private long scanForNextAvailableBlockOffset() throws IOException {
        byte[] bArr = new byte[1048576];
        boolean z = false;
        while (true) {
            long pos = this.inputStream.getPos();
            try {
                Arrays.fill(bArr, (byte) 0);
                this.inputStream.readFully(bArr, 0, bArr.length);
            } catch (EOFException e) {
                z = true;
            }
            long indexOf = Bytes.indexOf(bArr, HoodieLogFormat.MAGIC);
            if (indexOf >= 0) {
                return pos + indexOf;
            }
            if (z) {
                return this.inputStream.getPos();
            }
            this.inputStream.seek((pos + bArr.length) - HoodieLogFormat.MAGIC.length);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.inputStream.close();
        if (null != this.shutdownThread) {
            Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
        }
        this.closed = true;
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        try {
            return readMagic();
        } catch (IOException e) {
            throw new HoodieIOException("IOException when reading logfile " + this.logFile, e);
        }
    }

    private HoodieLogFormat.LogFormatVersion readVersion() throws IOException {
        return new HoodieLogFormatVersion(this.inputStream.readInt());
    }

    private boolean readMagic() throws IOException {
        try {
            boolean hasNextMagic = hasNextMagic();
            if (hasNextMagic) {
                return hasNextMagic;
            }
            throw new CorruptedLogFileException(this.logFile + " could not be read. Did not find the magic bytes at the start of the block");
        } catch (EOFException e) {
            return false;
        }
    }

    private boolean hasNextMagic() throws IOException {
        this.inputStream.readFully(this.magicBuffer, 0, 6);
        return Arrays.equals(this.magicBuffer, HoodieLogFormat.MAGIC);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public HoodieLogBlock next() {
        try {
            return readBlock();
        } catch (IOException e) {
            throw new HoodieIOException("IOException when reading logblock from log file " + this.logFile, e);
        }
    }

    @Override // org.apache.hudi.common.table.log.HoodieLogFormat.Reader
    public boolean hasPrev() {
        try {
            if (!this.reverseReader) {
                throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
            }
            this.reverseLogFilePosition = this.lastReverseLogFilePosition;
            this.reverseLogFilePosition -= 8;
            this.lastReverseLogFilePosition = this.reverseLogFilePosition;
            this.inputStream.seek(this.reverseLogFilePosition);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    @Override // org.apache.hudi.common.table.log.HoodieLogFormat.Reader
    public HoodieLogBlock prev() throws IOException {
        if (!this.reverseReader) {
            throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
        }
        long readLong = this.inputStream.readLong();
        long pos = this.inputStream.getPos();
        try {
            this.inputStream.seek(this.reverseLogFilePosition - readLong);
            hasNext();
            this.reverseLogFilePosition -= readLong;
            this.lastReverseLogFilePosition = this.reverseLogFilePosition;
            return next();
        } catch (Exception e) {
            this.inputStream.seek(pos);
            throw new CorruptedLogFileException("Found possible corrupted block, cannot read log file in reverse, fallback to forward reading of logfile");
        }
    }

    public long moveToPrev() throws IOException {
        if (!this.reverseReader) {
            throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
        }
        this.inputStream.seek(this.lastReverseLogFilePosition);
        long readLong = this.inputStream.readLong();
        this.inputStream.seek(this.reverseLogFilePosition - readLong);
        this.reverseLogFilePosition -= readLong;
        this.lastReverseLogFilePosition = this.reverseLogFilePosition;
        return this.reverseLogFilePosition;
    }

    @Override // java.util.Iterator
    public void remove() {
        throw new UnsupportedOperationException("Remove not supported for HoodieLogFileReader");
    }
}
