/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.functional;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.log.HoodieLogFileReader;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
import org.apache.hudi.exception.CorruptedLogFileException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

public class TestHoodieLogFormat
extends HoodieCommonTestHarness {
    private static String BASE_OUTPUT_PATH = "/tmp/";
    private FileSystem fs;
    private Path partitionPath;
    private int bufferSize = 4096;
    private HoodieLogBlock.HoodieLogBlockType dataBlockType = HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;

    @BeforeAll
    public static void setUpClass() throws IOException, InterruptedException {
        MiniClusterUtil.setUp();
    }

    @AfterAll
    public static void tearDownClass() {
        MiniClusterUtil.shutdown();
    }

    @BeforeEach
    public void setUp() throws IOException, InterruptedException {
        this.fs = MiniClusterUtil.fileSystem;
        Assertions.assertTrue((boolean)this.fs.mkdirs(new Path(this.tempDir.toAbsolutePath().toString())));
        this.partitionPath = new Path(this.tempDir.toAbsolutePath().toString());
        this.basePath = this.tempDir.getParent().toString();
        HoodieTestUtils.init(MiniClusterUtil.configuration, this.basePath, HoodieTableType.MERGE_ON_READ);
    }

    @AfterEach
    public void tearDown() throws IOException {
        this.fs.delete(this.partitionPath, true);
    }

    @Test
    public void testEmptyLog() throws IOException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Assertions.assertEquals((long)0L, (long)writer.getCurrentSize(), (String)"Just created this log, size should be 0");
        Assertions.assertTrue((boolean)writer.getLogFile().getFileName().startsWith("."), (String)"Check all log files should start with a .");
        Assertions.assertEquals((int)1, (int)writer.getLogFile().getLogVersion(), (String)"Version should be 1 for new log created");
    }

    @ParameterizedTest
    @EnumSource(names={"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK"})
    public void testBasicAppend(HoodieLogBlock.HoodieLogBlockType dataBlockType) throws IOException, InterruptedException, URISyntaxException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = this.getDataBlock(dataBlockType, records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        long size = writer.getCurrentSize();
        Assertions.assertTrue((size > 0L ? 1 : 0) != 0, (String)"We just wrote a block - size should be > 0");
        Assertions.assertEquals((long)size, (long)this.fs.getFileStatus(writer.getLogFile().getPath()).getLen(), (String)"Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        writer.close();
    }

    @ParameterizedTest
    @EnumSource(names={"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK"})
    public void testRollover() throws IOException, InterruptedException, URISyntaxException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        long size = writer.getCurrentSize();
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).withSizeThreshold(size - 1L).build();
        records = SchemaTestUtil.generateTestRecords(0, 100);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        Assertions.assertEquals((long)0L, (long)writer.getCurrentSize(), (String)"This should be a new log file and hence size should be 0");
        Assertions.assertEquals((int)2, (int)writer.getLogFile().getLogVersion(), (String)"Version should be rolled to 2");
        Path logFilePath = writer.getLogFile().getPath();
        Assertions.assertFalse((boolean)this.fs.exists(logFilePath), (String)("Path (" + logFilePath + ") must not exist"));
        writer.close();
    }

    @Test
    public void testConcurrentAppendOnExistingLogFileWithoutWriteToken() throws Exception {
        this.testConcurrentAppend(true, false);
    }

    @Test
    public void testConcurrentAppendOnExistingLogFileWithWriteToken() throws Exception {
        this.testConcurrentAppend(true, true);
    }

    @Test
    public void testConcurrentAppendOnFirstLogFileVersion() throws Exception {
        this.testConcurrentAppend(false, true);
    }

    private void testConcurrentAppend(boolean logFileExists, boolean newLogFileFormat) throws Exception {
        HoodieLogFormat.WriterBuilder builder1 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs);
        HoodieLogFormat.WriterBuilder builder2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs);
        if (newLogFileFormat && logFileExists) {
            builder1 = builder1.withLogVersion(1).withLogWriteToken("1-0-1").withRolloverLogWriteToken("1-0-1");
            builder2 = builder2.withLogVersion(1).withLogWriteToken("1-0-1").withRolloverLogWriteToken("1-0-1");
        } else if (newLogFileFormat) {
            builder1 = builder1.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION.intValue()).withLogWriteToken("1-0-1").withRolloverLogWriteToken("1-0-1");
            builder2 = builder2.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION.intValue()).withLogWriteToken("1-0-1").withRolloverLogWriteToken("1-0-1");
        } else {
            builder1 = builder1.withLogVersion(1).withRolloverLogWriteToken("1-0-1");
        }
        HoodieLogFormat.Writer writer = builder1.build();
        List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        HoodieLogFormat.Writer writer2 = builder2.build();
        writer2 = writer2.appendBlock((HoodieLogBlock)dataBlock);
        HoodieLogFile logFile1 = writer.getLogFile();
        HoodieLogFile logFile2 = writer2.getLogFile();
        writer.close();
        writer2.close();
        Assertions.assertNotNull((Object)logFile1.getLogWriteToken());
        Assertions.assertEquals((int)logFile1.getLogVersion(), (int)(logFile2.getLogVersion() - 1), (String)"Log Files must have different versions");
    }

    @Test
    public void testMultipleAppend() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        long size1 = writer.getCurrentSize();
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        records = SchemaTestUtil.generateTestRecords(0, 100);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        long size2 = writer.getCurrentSize();
        Assertions.assertTrue((size2 > size1 ? 1 : 0) != 0, (String)"We just wrote a new block - size2 should be > size1");
        Assertions.assertEquals((long)size2, (long)this.fs.getFileStatus(writer.getLogFile().getPath()).getLen(), (String)"Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        records = SchemaTestUtil.generateTestRecords(0, 100);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        long size3 = writer.getCurrentSize();
        Assertions.assertTrue((size3 > size2 ? 1 : 0) != 0, (String)"We just wrote a new block - size3 should be > size2");
        Assertions.assertEquals((long)size3, (long)this.fs.getFileStatus(writer.getLogFile().getPath()).getLen(), (String)"Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        writer.close();
        HoodieLogFormat.Writer closedWriter = writer;
        Assertions.assertThrows(IllegalStateException.class, () -> closedWriter.getCurrentSize(), (String)"getCurrentSize should fail after the logAppender is closed");
    }

    @Test
    public void testAppendNotSupported() throws IOException, URISyntaxException, InterruptedException {
        Path localPartitionPath = new Path("file://" + this.partitionPath);
        FileSystem localFs = FSUtils.getFs((String)localPartitionPath.toString(), (Configuration)HoodieTestUtils.getDefaultHadoopConf());
        Path testPath = new Path(localPartitionPath, "append_test");
        localFs.mkdirs(testPath);
        List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records, header);
        for (int i = 0; i < 2; ++i) {
            HoodieLogFormat.newWriterBuilder().onParentPath(testPath).withFileExtension(".archive").withFileId("commits.archive").overBaseCommit("").withFs(localFs).build().appendBlock((HoodieLogBlock)dataBlock).close();
        }
        FileStatus[] statuses = localFs.listStatus(testPath);
        Assertions.assertEquals((int)2, (int)statuses.length);
    }

    @ParameterizedTest
    @EnumSource(names={"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK"})
    public void testBasicWriteAndScan() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
        List copyOfRecords = records.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((FileSystem)this.fs, (HoodieLogFile)writer.getLogFile(), (Schema)SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We wrote a block, we should be able to read it");
        HoodieLogBlock nextBlock = (HoodieLogBlock)reader.next();
        Assertions.assertEquals((Object)this.dataBlockType, (Object)nextBlock.getBlockType(), (String)"The next block should be a data block");
        HoodieDataBlock dataBlockRead = (HoodieDataBlock)nextBlock;
        Assertions.assertEquals((int)copyOfRecords.size(), (int)dataBlockRead.getRecords().size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords, (Object)dataBlockRead.getRecords(), (String)"Both records lists should be the same. (ordering guaranteed)");
        reader.close();
    }

    @ParameterizedTest
    @EnumSource(names={"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK"})
    public void testBasicAppendAndRead() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records2 = SchemaTestUtil.generateTestRecords(0, 100);
        List copyOfRecords2 = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = this.getDataBlock(records2, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records3 = SchemaTestUtil.generateTestRecords(0, 100);
        List copyOfRecords3 = records3.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = this.getDataBlock(records3, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((FileSystem)this.fs, (HoodieLogFile)writer.getLogFile(), (Schema)SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"First block should be available");
        HoodieLogBlock nextBlock = (HoodieLogBlock)reader.next();
        HoodieDataBlock dataBlockRead = (HoodieDataBlock)nextBlock;
        Assertions.assertEquals((int)copyOfRecords1.size(), (int)dataBlockRead.getRecords().size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords1, (Object)dataBlockRead.getRecords(), (String)"Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertEquals((Object)dataBlockRead.getSchema(), (Object)SchemaTestUtil.getSimpleSchema());
        reader.hasNext();
        nextBlock = (HoodieLogBlock)reader.next();
        dataBlockRead = (HoodieDataBlock)nextBlock;
        Assertions.assertEquals((int)copyOfRecords2.size(), (int)dataBlockRead.getRecords().size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords2, (Object)dataBlockRead.getRecords(), (String)"Both records lists should be the same. (ordering guaranteed)");
        reader.hasNext();
        nextBlock = (HoodieLogBlock)reader.next();
        dataBlockRead = (HoodieDataBlock)nextBlock;
        Assertions.assertEquals((int)copyOfRecords3.size(), (int)dataBlockRead.getRecords().size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords3, (Object)dataBlockRead.getRecords(), (String)"Both records lists should be the same. (ordering guaranteed)");
        reader.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testBasicAppendAndScanMultipleFiles(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withSizeThreshold(1024L).withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HashSet<HoodieLogFile> logFiles = new HashSet<HoodieLogFile>();
        ArrayList allRecords = new ArrayList();
        while (writer.getLogFile().getLogVersion() != 4) {
            logFiles.add(writer.getLogFile());
            List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
            List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
            allRecords.add(copyOfRecords1);
            header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
            HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
            writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        }
        writer.close();
        HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()), schema, "100", Long.valueOf(10240L), readBlocksLazily, false, this.bufferSize, BASE_OUTPUT_PATH);
        ArrayList<IndexedRecord> scannedRecords = new ArrayList<IndexedRecord>();
        for (HoodieRecord record2 : scanner) {
            scannedRecords.add((IndexedRecord)record2.getData().getInsertValue(schema).get());
        }
        Assertions.assertEquals((long)scannedRecords.size(), (long)allRecords.stream().mapToLong(Collection::size).sum(), (String)"Scanner records count should be the same as appended records");
    }

    @Test
    public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        this.fs = FSUtils.getFs((String)this.fs.getUri().toString(), (Configuration)this.fs.getConf());
        FSDataOutputStream outputStream = this.fs.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(474L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(400L);
        outputStream.write("something-random".getBytes());
        outputStream.flush();
        outputStream.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        records = SchemaTestUtil.generateTestRecords(0, 10);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((FileSystem)this.fs, (HoodieLogFile)writer.getLogFile(), (Schema)SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"First block should be available");
        reader.next();
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We should have corrupted block next");
        HoodieLogBlock block = (HoodieLogBlock)reader.next();
        Assertions.assertEquals((Object)HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, (Object)block.getBlockType(), (String)"The read block should be a corrupt block");
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"Third block should be available");
        reader.next();
        Assertions.assertFalse((boolean)reader.hasNext(), (String)"There should be no more block left");
        reader.close();
        outputStream = this.fs.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(500L);
        outputStream.write("something-else-random".getBytes());
        outputStream.flush();
        outputStream.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        records = SchemaTestUtil.generateTestRecords(0, 100);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        reader = HoodieLogFormat.newReader((FileSystem)this.fs, (HoodieLogFile)writer.getLogFile(), (Schema)SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"First block should be available");
        reader.next();
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We should get the 1st corrupted block next");
        reader.next();
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"Third block should be available");
        reader.next();
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We should get the 2nd corrupted block next");
        block = (HoodieLogBlock)reader.next();
        Assertions.assertEquals((Object)HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, (Object)block.getBlockType(), (String)"The read block should be a corrupt block");
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We should get the last block next");
        reader.next();
        Assertions.assertFalse((boolean)reader.hasNext(), (String)"We should have no more blocks left");
        reader.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAvroLogRecordReaderBasic(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).withSizeThreshold(500L).build();
        List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords2 = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = this.getDataBlock(records2, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        List allLogFiles = FSUtils.getAllLogFiles((FileSystem)this.fs, (Path)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, allLogFiles, schema, "100", Long.valueOf(10240L), readBlocksLazily, false, this.bufferSize, BASE_OUTPUT_PATH);
        Assertions.assertEquals((long)200L, (long)scanner.getTotalLogRecords());
        HashSet readKeys = new HashSet(200);
        scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
        Assertions.assertEquals((int)200, (int)readKeys.size(), (String)"Stream collect should return all 200 records");
        copyOfRecords1.addAll(copyOfRecords2);
        Set originalKeys = copyOfRecords1.stream().map(s -> ((GenericRecord)s).get("_hoodie_record_key").toString()).collect(Collectors.toSet());
        Assertions.assertEquals(originalKeys, readKeys, (String)"CompositeAvroLogReader should return 200 records from 2 versions");
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAvroLogRecordReaderWithRollbackTombstone(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = this.getDataBlock(records2, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer = writer.appendBlock((HoodieLogBlock)commandBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords3 = records3.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = this.getDataBlock(records3, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        List allLogFiles = FSUtils.getAllLogFiles((FileSystem)this.fs, (Path)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, allLogFiles, schema, "102", Long.valueOf(10240L), readBlocksLazily, false, this.bufferSize, BASE_OUTPUT_PATH);
        Assertions.assertEquals((long)200L, (long)scanner.getTotalLogRecords(), (String)"We read 200 records from 2 write batches");
        HashSet readKeys = new HashSet(200);
        scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
        Assertions.assertEquals((int)200, (int)readKeys.size(), (String)"Stream collect should return all 200 records");
        copyOfRecords1.addAll(copyOfRecords3);
        Set originalKeys = copyOfRecords1.stream().map(s -> ((GenericRecord)s).get("_hoodie_record_key").toString()).collect(Collectors.toSet());
        Assertions.assertEquals(originalKeys, readKeys, (String)"CompositeAvroLogReader should return 200 records from 2 versions");
    }

    @Test
    public void testAvroLogRecordReaderWithRollbackPartialBlock() throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        this.fs = FSUtils.getFs((String)this.fs.getUri().toString(), (Configuration)this.fs.getConf());
        FSDataOutputStream outputStream = this.fs.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(1);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.write(HoodieLogBlock.getLogMetadataBytes(header));
        outputStream.writeLong((long)"something-random".getBytes().length);
        outputStream.write("something-random".getBytes());
        outputStream.flush();
        outputStream.close();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        writer = writer.appendBlock((HoodieLogBlock)commandBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords3 = records3.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = this.getDataBlock(records3, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        List allLogFiles = FSUtils.getAllLogFiles((FileSystem)this.fs, (Path)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, allLogFiles, schema, "103", Long.valueOf(10240L), true, false, this.bufferSize, BASE_OUTPUT_PATH);
        Assertions.assertEquals((long)200L, (long)scanner.getTotalLogRecords(), (String)"We would read 200 records");
        HashSet readKeys = new HashSet(200);
        scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
        Assertions.assertEquals((int)200, (int)readKeys.size(), (String)"Stream collect should return all 200 records");
        copyOfRecords1.addAll(copyOfRecords3);
        Set originalKeys = copyOfRecords1.stream().map(s -> ((GenericRecord)s).get("_hoodie_record_key").toString()).collect(Collectors.toSet());
        Assertions.assertEquals(originalKeys, readKeys, (String)"CompositeAvroLogReader should return 200 records from 2 versions");
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAvroLogRecordReaderWithDeleteAndRollback(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords2 = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        dataBlock = this.getDataBlock(records2, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        copyOfRecords1.addAll(copyOfRecords2);
        List originalKeys = copyOfRecords1.stream().map(s -> ((GenericRecord)s).get("_hoodie_record_key").toString()).collect(Collectors.toList());
        List<HoodieKey> deletedKeys = copyOfRecords1.stream().map(s -> new HoodieKey(((GenericRecord)s).get("_hoodie_record_key").toString(), ((GenericRecord)s).get("_hoodie_partition_path").toString())).collect(Collectors.toList()).subList(0, 50);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
        writer = writer.appendBlock((HoodieLogBlock)deleteBlock);
        List allLogFiles = FSUtils.getAllLogFiles((FileSystem)this.fs, (Path)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, allLogFiles, schema, "102", Long.valueOf(10240L), readBlocksLazily, false, this.bufferSize, BASE_OUTPUT_PATH);
        Assertions.assertEquals((long)200L, (long)scanner.getTotalLogRecords(), (String)"We still would read 200 records");
        ArrayList readKeys = new ArrayList(200);
        ArrayList emptyPayloads = new ArrayList();
        scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
        scanner.forEach(s -> {
            try {
                if (!s.getData().getInsertValue(schema).isPresent()) {
                    emptyPayloads.add(true);
                }
            }
            catch (IOException io) {
                throw new UncheckedIOException(io);
            }
        });
        Assertions.assertEquals((int)200, (int)readKeys.size(), (String)"Stream collect should return all 200 records");
        Assertions.assertEquals((int)50, (int)emptyPayloads.size(), (String)"Stream collect should return all 50 records with empty payloads");
        originalKeys.removeAll(deletedKeys);
        Collections.sort(originalKeys);
        Collections.sort(readKeys);
        Assertions.assertEquals(originalKeys, readKeys, (String)"CompositeAvroLogReader should return 150 records from 2 versions");
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "102");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        readKeys.clear();
        scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, allLogFiles, schema, "101", Long.valueOf(10240L), readBlocksLazily, false, this.bufferSize, BASE_OUTPUT_PATH);
        scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
        Assertions.assertEquals((int)200, (int)readKeys.size(), (String)"Stream collect should return all 200 records after rollback of delete");
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAvroLogRecordReaderWithFailedRollbacks(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = this.getDataBlock(records2, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        List<HoodieKey> deletedKeys = copyOfRecords1.stream().map(s -> new HoodieKey(((GenericRecord)s).get("_hoodie_record_key").toString(), ((GenericRecord)s).get("_hoodie_partition_path").toString())).collect(Collectors.toList()).subList(0, 50);
        HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
        writer = writer.appendBlock((HoodieLogBlock)deleteBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        try {
            writer = writer.appendBlock((HoodieLogBlock)commandBlock);
            throw new Exception("simulating failure");
        }
        catch (Exception exception) {
            writer.appendBlock((HoodieLogBlock)commandBlock);
            List allLogFiles = FSUtils.getAllLogFiles((FileSystem)this.fs, (Path)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
            HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, allLogFiles, schema, "100", Long.valueOf(10240L), readBlocksLazily, false, this.bufferSize, BASE_OUTPUT_PATH);
            Assertions.assertEquals((long)0L, (long)scanner.getTotalLogRecords(), (String)"We would have scanned 0 records because of rollback");
            ArrayList readKeys = new ArrayList();
            scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
            Assertions.assertEquals((int)0, (int)readKeys.size(), (String)"Stream collect should return all 0 records");
            return;
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAvroLogRecordReaderWithInsertDeleteAndRollback(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        List<HoodieKey> deletedKeys = copyOfRecords1.stream().map(s -> new HoodieKey(((GenericRecord)s).get("_hoodie_record_key").toString(), ((GenericRecord)s).get("_hoodie_partition_path").toString())).collect(Collectors.toList()).subList(0, 50);
        HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
        writer = writer.appendBlock((HoodieLogBlock)deleteBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer = writer.appendBlock((HoodieLogBlock)commandBlock);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        List allLogFiles = FSUtils.getAllLogFiles((FileSystem)this.fs, (Path)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, allLogFiles, schema, "100", Long.valueOf(10240L), readBlocksLazily, false, this.bufferSize, BASE_OUTPUT_PATH);
        Assertions.assertEquals((long)0L, (long)scanner.getTotalLogRecords(), (String)"We would read 0 records");
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAvroLogRecordReaderWithInvalidRollback(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        List allLogFiles = FSUtils.getAllLogFiles((FileSystem)this.fs, (Path)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, allLogFiles, schema, "100", Long.valueOf(10240L), readBlocksLazily, false, this.bufferSize, BASE_OUTPUT_PATH);
        Assertions.assertEquals((long)100L, (long)scanner.getTotalLogRecords(), (String)"We still would read 100 records");
        ArrayList readKeys = new ArrayList(100);
        scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
        Assertions.assertEquals((int)100, (int)readKeys.size(), (String)"Stream collect should return all 150 records");
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        List<HoodieKey> deletedKeys = copyOfRecords1.stream().map(s -> new HoodieKey(((GenericRecord)s).get("_hoodie_record_key").toString(), ((GenericRecord)s).get("_hoodie_partition_path").toString())).collect(Collectors.toList()).subList(0, 50);
        HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
        writer = writer.appendBlock((HoodieLogBlock)deleteBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        List allLogFiles = FSUtils.getAllLogFiles((FileSystem)this.fs, (Path)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, allLogFiles, schema, "101", Long.valueOf(10240L), readBlocksLazily, false, this.bufferSize, BASE_OUTPUT_PATH);
        Assertions.assertEquals((long)0L, (long)scanner.getTotalLogRecords(), (String)"We would read 0 records");
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        this.fs = FSUtils.getFs((String)this.fs.getUri().toString(), (Configuration)this.fs.getConf());
        FSDataOutputStream outputStream = this.fs.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(100L);
        outputStream.flush();
        outputStream.close();
        this.fs = FSUtils.getFs((String)this.fs.getUri().toString(), (Configuration)this.fs.getConf());
        outputStream = this.fs.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(100L);
        outputStream.flush();
        outputStream.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        this.fs = FSUtils.getFs((String)this.fs.getUri().toString(), (Configuration)this.fs.getConf());
        outputStream = this.fs.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(100L);
        outputStream.flush();
        outputStream.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer = writer.appendBlock((HoodieLogBlock)commandBlock);
        writer.close();
        List allLogFiles = FSUtils.getAllLogFiles((FileSystem)this.fs, (Path)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, allLogFiles, schema, "101", Long.valueOf(10240L), readBlocksLazily, false, this.bufferSize, BASE_OUTPUT_PATH);
        Assertions.assertEquals((long)0L, (long)scanner.getTotalLogRecords(), (String)"We would read 0 records");
    }

    private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2, boolean readBlocksLazily) {
        try {
            Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
            List<IndexedRecord> records = SchemaTestUtil.generateHoodieTestRecords(0, 101);
            ArrayList<IndexedRecord> records2 = new ArrayList<IndexedRecord>(records);
            HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
            HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
            header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
            header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
            HoodieDataBlock dataBlock = this.getDataBlock(records.subList(0, numRecordsInLog1), header);
            writer = writer.appendBlock((HoodieLogBlock)dataBlock);
            long size = writer.getCurrentSize();
            writer.close();
            HoodieLogFormat.Writer writer2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).withSizeThreshold(size - 1L).build();
            HashMap<HoodieLogBlock.HeaderMetadataType, String> header2 = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
            header2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
            header2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
            HoodieDataBlock dataBlock2 = this.getDataBlock(records2.subList(0, numRecordsInLog2), header2);
            writer2 = writer2.appendBlock((HoodieLogBlock)dataBlock2);
            writer2.close();
            List allLogFiles = FSUtils.getAllLogFiles((FileSystem)this.fs, (Path)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
            HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.fs, this.basePath, allLogFiles, schema, "100", Long.valueOf(10240L), readBlocksLazily, false, this.bufferSize, BASE_OUTPUT_PATH);
            Assertions.assertEquals((long)Math.max(numRecordsInLog1, numRecordsInLog2), (long)scanner.getNumMergedRecordsInLog(), (String)"We would read 100 records");
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(boolean readBlocksLazily) {
        this.testAvroLogRecordReaderMergingMultipleLogFiles(77, 100, readBlocksLazily);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(boolean readBlocksLazily) {
        this.testAvroLogRecordReaderMergingMultipleLogFiles(100, 66, readBlocksLazily);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(boolean readBlocksLazily) {
        this.testAvroLogRecordReaderMergingMultipleLogFiles(100, 100, readBlocksLazily);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testBasicAppendAndReadInReverse(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records2 = SchemaTestUtil.generateTestRecords(0, 100);
        List copyOfRecords2 = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        dataBlock = this.getDataBlock(records2, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records3 = SchemaTestUtil.generateTestRecords(0, 100);
        List copyOfRecords3 = records3.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        dataBlock = this.getDataBlock(records3, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        HoodieLogFileReader reader = new HoodieLogFileReader(this.fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), this.bufferSize, readBlocksLazily, true);
        Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Last block should be available");
        HoodieLogBlock prevBlock = reader.prev();
        HoodieDataBlock dataBlockRead = (HoodieDataBlock)prevBlock;
        Assertions.assertEquals((int)copyOfRecords3.size(), (int)dataBlockRead.getRecords().size(), (String)"Third records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords3, (Object)dataBlockRead.getRecords(), (String)"Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Second block should be available");
        prevBlock = reader.prev();
        dataBlockRead = (HoodieDataBlock)prevBlock;
        Assertions.assertEquals((int)copyOfRecords2.size(), (int)dataBlockRead.getRecords().size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords2, (Object)dataBlockRead.getRecords(), (String)"Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertTrue((boolean)reader.hasPrev(), (String)"First block should be available");
        prevBlock = reader.prev();
        dataBlockRead = (HoodieDataBlock)prevBlock;
        Assertions.assertEquals((int)copyOfRecords1.size(), (int)dataBlockRead.getRecords().size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords1, (Object)dataBlockRead.getRecords(), (String)"Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertFalse((boolean)reader.hasPrev());
        reader.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        this.fs = FSUtils.getFs((String)this.fs.getUri().toString(), (Configuration)this.fs.getConf());
        FSDataOutputStream outputStream = this.fs.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1000);
        outputStream.writeInt(1);
        outputStream.write(HoodieLogBlock.getLogMetadataBytes(header));
        outputStream.write("something-random".getBytes());
        outputStream.flush();
        outputStream.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        records = SchemaTestUtil.generateTestRecords(0, 100);
        dataBlock = this.getDataBlock(records, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        HoodieLogFileReader reader = new HoodieLogFileReader(this.fs, writer.getLogFile(), schema, this.bufferSize, readBlocksLazily, true);
        Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Last block should be available");
        HoodieLogBlock block = reader.prev();
        Assertions.assertTrue((boolean)(block instanceof HoodieDataBlock), (String)"Last block should be datablock");
        Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Last block should be available");
        Assertions.assertThrows(CorruptedLogFileException.class, () -> reader.prev());
        reader.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = this.getDataBlock(records1, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records2 = SchemaTestUtil.generateTestRecords(0, 100);
        dataBlock = this.getDataBlock(records2, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> records3 = SchemaTestUtil.generateTestRecords(0, 100);
        dataBlock = this.getDataBlock(records3, header);
        writer = writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        HoodieLogFileReader reader = new HoodieLogFileReader(this.fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), this.bufferSize, readBlocksLazily, true);
        Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Third block should be available");
        reader.moveToPrev();
        Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Second block should be available");
        reader.moveToPrev();
        Assertions.assertTrue((boolean)reader.hasPrev(), (String)"First block should be available");
        HoodieLogBlock prevBlock = reader.prev();
        HoodieDataBlock dataBlockRead = (HoodieDataBlock)prevBlock;
        Assertions.assertEquals((int)copyOfRecords1.size(), (int)dataBlockRead.getRecords().size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords1, (Object)dataBlockRead.getRecords(), (String)"Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertFalse((boolean)reader.hasPrev());
        reader.close();
    }

    @Test
    public void testV0Format() throws IOException, URISyntaxException {
        int i;
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
        ArrayList<IndexedRecord> recordsCopy = new ArrayList<IndexedRecord>(records);
        Assertions.assertEquals((int)100, (int)records.size());
        Assertions.assertEquals((int)100, (int)recordsCopy.size());
        HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, schema);
        byte[] content = dataBlock.getBytes(schema);
        Assertions.assertTrue((content.length > 0 ? 1 : 0) != 0);
        HoodieAvroDataBlock logBlock = HoodieAvroDataBlock.getBlock((byte[])content, (Schema)schema);
        Assertions.assertEquals((Object)HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, (Object)logBlock.getBlockType());
        List readRecords = logBlock.getRecords();
        Assertions.assertEquals((int)readRecords.size(), (int)recordsCopy.size());
        for (i = 0; i < recordsCopy.size(); ++i) {
            Assertions.assertEquals(recordsCopy.get(i), readRecords.get(i));
        }
        logBlock = HoodieAvroDataBlock.getBlock((byte[])content, null);
        Assertions.assertEquals((Object)HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, (Object)logBlock.getBlockType());
        readRecords = logBlock.getRecords();
        Assertions.assertEquals((int)readRecords.size(), (int)recordsCopy.size());
        for (i = 0; i < recordsCopy.size(); ++i) {
            Assertions.assertEquals(recordsCopy.get(i), readRecords.get(i));
        }
    }

    private HoodieDataBlock getDataBlock(List<IndexedRecord> records, Map<HoodieLogBlock.HeaderMetadataType, String> header) {
        return this.getDataBlock(this.dataBlockType, records, header);
    }

    private HoodieDataBlock getDataBlock(HoodieLogBlock.HoodieLogBlockType dataBlockType, List<IndexedRecord> records, Map<HoodieLogBlock.HeaderMetadataType, String> header) {
        switch (dataBlockType) {
            case AVRO_DATA_BLOCK: {
                return new HoodieAvroDataBlock(records, header);
            }
            case HFILE_DATA_BLOCK: {
                return new HoodieHFileDataBlock(records, header);
            }
        }
        throw new RuntimeException("Unknown data block type " + dataBlockType);
    }
}

