package org.apache.hudi.testutils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.JavaHoodieIndexFactory;
import org.apache.hudi.io.storage.HoodieHFileUtils;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.JavaHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieJavaTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.utils.HoodieWriterClientTestHarness;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/testutils/HoodieJavaClientTestHarness.class */
public abstract class HoodieJavaClientTestHarness extends HoodieWriterClientTestHarness {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieJavaClientTestHarness.class);
    protected Configuration hadoopConf;
    protected HoodieJavaEngineContext context;
    protected TestJavaTaskContextSupplier taskContextSupplier;
    protected FileSystem fs;
    protected ExecutorService executorService;
    protected HoodieTableFileSystemView tableView;
    protected HoodieJavaWriteClient writeClient;

    /* loaded from: input_file:org/apache/hudi/testutils/HoodieJavaClientTestHarness$TestJavaTaskContextSupplier.class */
    public class TestJavaTaskContextSupplier extends TaskContextSupplier {
        int partitionId = 0;
        int stageId = 0;
        long attemptId = 0;

        public TestJavaTaskContextSupplier() {
        }

        public void reset() {
            this.stageId++;
        }

        public Supplier<Integer> getPartitionIdSupplier() {
            return () -> {
                return Integer.valueOf(this.partitionId);
            };
        }

        public Supplier<Integer> getStageIdSupplier() {
            return () -> {
                return Integer.valueOf(this.stageId);
            };
        }

        public Supplier<Long> getAttemptIdSupplier() {
            return () -> {
                return Long.valueOf(this.attemptId);
            };
        }

        public Option<String> getProperty(EngineProperty engineProperty) {
            return Option.empty();
        }

        public Supplier<Integer> getAttemptNumberSupplier() {
            return () -> {
                return Integer.valueOf((int) this.attemptId);
            };
        }
    }

    @AfterAll
    public static void tearDownAll() throws IOException {
        FileSystem.closeAll();
    }

    @BeforeEach
    protected void initResources() throws IOException {
        this.basePath = this.tempDir.resolve("java_client_tests" + System.currentTimeMillis()).toAbsolutePath().toUri().getPath();
        this.hadoopConf = new Configuration();
        this.taskContextSupplier = new TestJavaTaskContextSupplier();
        this.context = new HoodieJavaEngineContext(this.hadoopConf, this.taskContextSupplier);
        initFileSystem(this.basePath, this.hadoopConf);
        initTestDataGenerator();
        initMetaClient();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @AfterEach
    public void cleanupResources() throws IOException {
        cleanupClients();
        cleanupTestDataGenerator();
        cleanupFileSystem();
        cleanupExecutorService();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initFileSystem(String str, Configuration configuration) {
        if (str == null) {
            throw new IllegalStateException("The base path has not been initialized.");
        }
        this.fs = FSUtils.getFs(str, configuration);
        if (this.fs instanceof LocalFileSystem) {
            this.fs.setVerifyChecksum(true);
        }
    }

    protected void cleanupFileSystem() throws IOException {
        if (this.fs != null) {
            LOG.warn("Closing file-system instance used in previous test-run");
            this.fs.close();
            this.fs = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initMetaClient() throws IOException {
        initMetaClient(getTableType());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initMetaClient(HoodieTableType hoodieTableType) throws IOException {
        if (this.basePath == null) {
            throw new IllegalStateException("The base path has not been initialized.");
        }
        this.metaClient = HoodieTestUtils.init(this.hadoopConf, this.basePath, hoodieTableType);
    }

    protected void cleanupClients() {
        if (this.metaClient != null) {
            this.metaClient = null;
        }
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
        if (this.tableView != null) {
            this.tableView.close();
            this.tableView = null;
        }
    }

    protected void cleanupExecutorService() {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            this.executorService = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieJavaWriteClient getHoodieWriteClient(HoodieWriteConfig hoodieWriteConfig) {
        if (null != this.writeClient) {
            this.writeClient.close();
            this.writeClient = null;
        }
        this.writeClient = new HoodieJavaWriteClient(this.context, hoodieWriteConfig);
        return this.writeClient;
    }

    public void syncTableMetadata(HoodieWriteConfig hoodieWriteConfig) {
        if (hoodieWriteConfig.getMetadataConfig().enabled()) {
            try {
                HoodieTableMetadataWriter create = JavaHoodieBackedTableMetadataWriter.create(this.hadoopConf, hoodieWriteConfig, this.context, Option.empty());
                Throwable th = null;
                try {
                    try {
                        LOG.info("Successfully synced to metadata table");
                        if (create != null) {
                            if (0 != 0) {
                                try {
                                    create.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                create.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new HoodieMetadataException("Error syncing to metadata table.", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieTableMetadata metadata(HoodieWriteConfig hoodieWriteConfig, HoodieEngineContext hoodieEngineContext) {
        return HoodieTableMetadata.create(hoodieEngineContext, hoodieWriteConfig.getMetadataConfig(), hoodieWriteConfig.getBasePath());
    }

    public void validateMetadata(HoodieTestTable hoodieTestTable, List<String> list, HoodieWriteConfig hoodieWriteConfig, String str, boolean z) throws IOException {
        HoodieTableMetadata metadata = metadata(hoodieWriteConfig, this.context);
        Assertions.assertNotNull(metadata, "MetadataReader should have been initialized");
        if (hoodieWriteConfig.isMetadataTableEnabled()) {
            if ((metadata instanceof FileSystemBackedTableMetadata) || !metadata.getSyncedInstantTime().isPresent()) {
                throw new IllegalStateException("Metadata should have synced some commits or tableMetadata should not be an instance of FileSystemBackedTableMetadata");
            }
            Assertions.assertEquals(list, hoodieTestTable.inflightCommits());
            HoodieTimer start = HoodieTimer.start();
            HoodieJavaEngineContext hoodieJavaEngineContext = new HoodieJavaEngineContext(this.hadoopConf);
            List allPartitionPaths = hoodieTestTable.getAllPartitionPaths();
            ArrayList arrayList = new ArrayList();
            allPartitionPaths.forEach(path -> {
                arrayList.add(path.getFileName().toString());
            });
            if (arrayList.isEmpty() && hoodieTestTable.isNonPartitioned()) {
                arrayList.add("");
            }
            List allPartitionPaths2 = metadata.getAllPartitionPaths();
            Collections.sort(arrayList);
            Collections.sort(allPartitionPaths2);
            Assertions.assertEquals(arrayList.size(), allPartitionPaths2.size(), "Partitions should match");
            Assertions.assertEquals(arrayList, allPartitionPaths2, "Partitions should match");
            this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
            SyncableFileSystemView hoodieView = HoodieJavaTable.create(hoodieWriteConfig, hoodieJavaEngineContext).getHoodieView();
            Map allFilesInPartitions = metadata.getAllFilesInPartitions((List) arrayList.stream().map(str2 -> {
                return this.basePath + "/" + str2;
            }).collect(Collectors.toList()));
            Assertions.assertEquals(arrayList.size(), allFilesInPartitions.size());
            arrayList.forEach(str3 -> {
                try {
                    validateFilesPerPartition(hoodieTestTable, metadata, hoodieView, allFilesInPartitions, str3);
                } catch (IOException e) {
                    Assertions.fail("Exception should not be raised: " + e);
                }
            });
            if (z) {
                runFullValidation(hoodieWriteConfig, str, hoodieJavaEngineContext);
            }
            LOG.info("Validation time=" + start.endTimer());
        }
    }

    protected void validateFilesPerPartition(HoodieTestTable hoodieTestTable, HoodieTableMetadata hoodieTableMetadata, TableFileSystemView tableFileSystemView, Map<String, FileStatus[]> map, String str) throws IOException {
        Path path = str.equals("") ? new Path(this.basePath) : new Path(this.basePath, str);
        FileStatus[] listAllFilesInPartition = hoodieTestTable.listAllFilesInPartition(str);
        FileStatus[] allFilesInPartition = hoodieTableMetadata.getAllFilesInPartition(path);
        List list = (List) Arrays.stream(listAllFilesInPartition).map(fileStatus -> {
            return fileStatus.getPath().getName();
        }).collect(Collectors.toList());
        List list2 = (List) Arrays.stream(allFilesInPartition).map(fileStatus2 -> {
            return fileStatus2.getPath().getName();
        }).collect(Collectors.toList());
        Collections.sort(list);
        Collections.sort(list2);
        Assertions.assertLinesMatch(list, list2);
        Assertions.assertEquals(listAllFilesInPartition.length, map.get(path.toString()).length);
        Arrays.stream(allFilesInPartition).forEach(fileStatus3 -> {
            Assertions.assertTrue(fileStatus3.getBlockSize() > 0);
        });
        Assertions.assertEquals((List) Arrays.stream(listAllFilesInPartition).map((v0) -> {
            return v0.getBlockSize();
        }).sorted().collect(Collectors.toList()), (List) Arrays.stream(allFilesInPartition).map((v0) -> {
            return v0.getBlockSize();
        }).sorted().collect(Collectors.toList()));
        Assertions.assertEquals(list.size(), list2.size(), "Files within partition " + str + " should match");
        Assertions.assertEquals(list, list2, "Files within partition " + str + " should match");
        List list3 = (List) tableFileSystemView.getAllFileGroups(str).collect(Collectors.toList());
        list3.addAll((Collection) tableFileSystemView.getAllReplacedFileGroups(str).collect(Collectors.toList()));
        list3.forEach(hoodieFileGroup -> {
            LoggerFactory.getLogger(getClass()).info(hoodieFileGroup.toString());
        });
        list3.forEach(hoodieFileGroup2 -> {
            hoodieFileGroup2.getAllBaseFiles().forEach(hoodieBaseFile -> {
                LoggerFactory.getLogger(getClass()).info(hoodieBaseFile.toString());
            });
        });
        list3.forEach(hoodieFileGroup3 -> {
            hoodieFileGroup3.getAllFileSlices().forEach(fileSlice -> {
                LoggerFactory.getLogger(getClass()).info(fileSlice.toString());
            });
        });
        Assertions.assertEquals(list2.size(), list3.stream().mapToLong(hoodieFileGroup4 -> {
            return hoodieFileGroup4.getAllBaseFiles().count() + hoodieFileGroup4.getAllFileSlices().mapToLong(fileSlice -> {
                return fileSlice.getLogFiles().count();
            }).sum();
        }).sum());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieBackedTableMetadataWriter metadataWriter(HoodieWriteConfig hoodieWriteConfig) {
        return JavaHoodieBackedTableMetadataWriter.create(this.hadoopConf, hoodieWriteConfig, new HoodieJavaEngineContext(this.hadoopConf), Option.empty());
    }

    private void runFullValidation(HoodieWriteConfig hoodieWriteConfig, String str, HoodieEngineContext hoodieEngineContext) {
        HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(hoodieWriteConfig);
        Assertions.assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
        HoodieWriteConfig writeConfig = metadataWriter.getWriteConfig();
        Assertions.assertFalse(writeConfig.isMetadataTableEnabled(), "No metadata table for metadata table");
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(str).build();
        Assertions.assertEquals(build.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
        Assertions.assertEquals(build.getTableConfig().getBaseFileFormat(), HoodieFileFormat.HFILE, "Metadata Table base file format should be HFile");
        List allPartitionPaths = FSUtils.getAllPartitionPaths(hoodieEngineContext, HoodieTableMetadata.getMetadataTableBasePath(this.basePath), false, false);
        List enabledPartitionTypes = metadataWriter.getEnabledPartitionTypes();
        Assertions.assertEquals(enabledPartitionTypes.size(), allPartitionPaths.size());
        Map map = (Map) enabledPartitionTypes.stream().collect(Collectors.toMap((v0) -> {
            return v0.getPartitionPath();
        }, Function.identity()));
        int cleanerFileVersionsRetained = writeConfig.getCleanerFileVersionsRetained() + 1;
        HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(build, build.getActiveTimeline());
        allPartitionPaths.forEach(str2 -> {
            List list = (List) hoodieTableFileSystemView.getLatestFileSlices(str2).collect(Collectors.toList());
            Assertions.assertTrue(list.stream().map((v0) -> {
                return v0.getBaseFile();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).count() > 0, "Should have a single latest base file");
            Assertions.assertTrue(list.size() > 0, "Should have a single latest file slice");
            Assertions.assertTrue(list.size() <= cleanerFileVersionsRetained, "Should limit file slice to " + cleanerFileVersionsRetained + " but was " + list.size());
        });
    }

    public HoodieJavaTable getHoodieTable(HoodieTableMetaClient hoodieTableMetaClient, HoodieWriteConfig hoodieWriteConfig) {
        HoodieJavaTable create = HoodieJavaTable.create(hoodieWriteConfig, this.context, hoodieTableMetaClient);
        create.getSliceView().reset();
        return create;
    }

    public List<WriteStatus> insertFirstBatch(HoodieWriteConfig hoodieWriteConfig, HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, int i, HoodieWriterClientTestHarness.Function3<List<WriteStatus>, HoodieJavaWriteClient, List<HoodieRecord>, String> function3, boolean z, boolean z2, int i2) throws Exception {
        return insertFirstBatch(hoodieWriteConfig, hoodieJavaWriteClient, str, str2, i, function3, z, z2, i2, true);
    }

    public List<WriteStatus> insertFirstBatch(HoodieWriteConfig hoodieWriteConfig, HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, int i, HoodieWriterClientTestHarness.Function3<List<WriteStatus>, HoodieJavaWriteClient, List<HoodieRecord>, String> function3, boolean z, boolean z2, int i2, boolean z3) throws Exception {
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        return writeBatch(hoodieJavaWriteClient, str, str2, Option.empty(), str2, i, generateWrapRecordsFn(z, hoodieWriteConfig, hoodieTestDataGenerator::generateInserts), function3, z2, i2, i2, 1, false, z3);
    }

    public List<WriteStatus> insertBatch(HoodieWriteConfig hoodieWriteConfig, HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, int i, HoodieWriterClientTestHarness.Function3<List<WriteStatus>, HoodieJavaWriteClient, List<HoodieRecord>, String> function3, boolean z, boolean z2, int i2, int i3, int i4, Option<String> option) throws Exception {
        if (option.isPresent()) {
            HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
            hoodieTestDataGenerator.getClass();
            return writeBatch(hoodieJavaWriteClient, str, str2, Option.empty(), str2, i, generateWrapRecordsForPartitionFn(z, hoodieWriteConfig, hoodieTestDataGenerator::generateInsertsForPartition), function3, z2, i2, i3, i4, false, (String) option.get());
        }
        HoodieTestDataGenerator hoodieTestDataGenerator2 = this.dataGen;
        hoodieTestDataGenerator2.getClass();
        return writeBatch(hoodieJavaWriteClient, str, str2, Option.empty(), str2, i, generateWrapRecordsFn(z, hoodieWriteConfig, hoodieTestDataGenerator2::generateInserts), function3, z2, i2, i3, i4, false);
    }

    public List<WriteStatus> updateBatch(HoodieWriteConfig hoodieWriteConfig, HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, Option<List<String>> option, String str3, int i, HoodieWriterClientTestHarness.Function3<List<WriteStatus>, HoodieJavaWriteClient, List<HoodieRecord>, String> function3, boolean z, boolean z2, int i2, int i3, int i4) throws Exception {
        return updateBatch(hoodieWriteConfig, hoodieJavaWriteClient, str, str2, option, str3, i, function3, z, z2, i2, i3, i4, true);
    }

    public List<WriteStatus> updateBatch(HoodieWriteConfig hoodieWriteConfig, HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, Option<List<String>> option, String str3, int i, HoodieWriterClientTestHarness.Function3<List<WriteStatus>, HoodieJavaWriteClient, List<HoodieRecord>, String> function3, boolean z, boolean z2, int i2, int i3, int i4, boolean z3) throws Exception {
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        return writeBatch(hoodieJavaWriteClient, str, str2, option, str3, i, generateWrapRecordsFn(z, hoodieWriteConfig, hoodieTestDataGenerator::generateUniqueUpdates), function3, z2, i2, i3, i4, false, z3);
    }

    public List<WriteStatus> deleteBatch(HoodieWriteConfig hoodieWriteConfig, HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, String str3, int i, boolean z, boolean z2, int i2, int i3) throws Exception {
        return deleteBatch(hoodieWriteConfig, hoodieJavaWriteClient, str, str2, str3, i, z, z2, i2, i3, true);
    }

    public List<WriteStatus> deleteBatch(HoodieWriteConfig hoodieWriteConfig, HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, String str3, int i, boolean z, boolean z2, int i2, int i3, boolean z3) throws Exception {
        if (z) {
            HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
            hoodieTestDataGenerator.getClass();
            HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn = generateWrapRecordsFn(z, hoodieWriteConfig, hoodieTestDataGenerator::generateUniqueDeleteRecords);
            hoodieJavaWriteClient.startCommitWithTime(str);
            List list = (List) generateWrapRecordsFn.apply(str, Integer.valueOf(i));
            HoodieWriterClientTestHarness.Function3 function3 = (v0, v1, v2) -> {
                return v0.deletePrepped(v1, v2);
            };
            return getWriteStatusAndVerifyDeleteOperation(str, str2, str3, z2, i2, i3, z3, (List) function3.apply(hoodieJavaWriteClient, list, str));
        }
        HoodieTestDataGenerator hoodieTestDataGenerator2 = this.dataGen;
        hoodieTestDataGenerator2.getClass();
        Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn = generateWrapDeleteKeysFn(z, hoodieWriteConfig, hoodieTestDataGenerator2::generateUniqueDeletes);
        hoodieJavaWriteClient.startCommitWithTime(str);
        List<HoodieKey> apply = generateWrapDeleteKeysFn.apply(Integer.valueOf(i));
        assertPartitionMetadataForKeys(this.basePath, apply, this.fs);
        HoodieWriterClientTestHarness.Function3 function32 = (v0, v1, v2) -> {
            return v0.delete(v1, v2);
        };
        return getWriteStatusAndVerifyDeleteOperation(str, str2, str3, z2, i2, i3, z3, (List) function32.apply(hoodieJavaWriteClient, apply, str));
    }

    public List<WriteStatus> writeBatch(HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, Option<List<String>> option, String str3, int i, HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> function2, HoodieWriterClientTestHarness.Function3<List<WriteStatus>, HoodieJavaWriteClient, List<HoodieRecord>, String> function3, boolean z, int i2, int i3, int i4, boolean z2) throws Exception {
        return writeBatch(hoodieJavaWriteClient, str, str2, option, str3, i, function2, function3, z, i2, i3, i4, z2, true);
    }

    public List<WriteStatus> writeBatch(HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, Option<List<String>> option, String str3, int i, HoodieWriterClientTestHarness.Function3<List<HoodieRecord>, String, Integer, String> function3, HoodieWriterClientTestHarness.Function3<List<WriteStatus>, HoodieJavaWriteClient, List<HoodieRecord>, String> function32, boolean z, int i2, int i3, int i4, boolean z2, String str4) throws Exception {
        return writeBatch(hoodieJavaWriteClient, str, str2, option, str3, i, function3, function32, z, i2, i3, i4, z2, true, str4);
    }

    public List<WriteStatus> writeBatch(HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, Option<List<String>> option, String str3, int i, HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> function2, HoodieWriterClientTestHarness.Function3<List<WriteStatus>, HoodieJavaWriteClient, List<HoodieRecord>, String> function3, boolean z, int i2, int i3, int i4, boolean z2, boolean z3) throws Exception {
        return writeBatchHelper(hoodieJavaWriteClient, str, str2, option, str3, i, (List) function2.apply(str, Integer.valueOf(i)), function3, z, i2, i3, i4, z2, z3);
    }

    public List<WriteStatus> writeBatch(HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, Option<List<String>> option, String str3, int i, HoodieWriterClientTestHarness.Function3<List<HoodieRecord>, String, Integer, String> function3, HoodieWriterClientTestHarness.Function3<List<WriteStatus>, HoodieJavaWriteClient, List<HoodieRecord>, String> function32, boolean z, int i2, int i3, int i4, boolean z2, boolean z3, String str4) throws Exception {
        return writeBatchHelper(hoodieJavaWriteClient, str, str2, option, str3, i, (List) function3.apply(str, Integer.valueOf(i), str4), function32, z, i2, i3, i4, z2, z3);
    }

    private List<WriteStatus> writeBatchHelper(HoodieJavaWriteClient hoodieJavaWriteClient, String str, String str2, Option<List<String>> option, String str3, int i, List<HoodieRecord> list, HoodieWriterClientTestHarness.Function3<List<WriteStatus>, HoodieJavaWriteClient, List<HoodieRecord>, String> function3, boolean z, int i2, int i3, int i4, boolean z2, boolean z3) throws IOException {
        hoodieJavaWriteClient.startCommitWithTime(str);
        List<WriteStatus> list2 = (List) function3.apply(hoodieJavaWriteClient, list, str);
        Assertions.assertNoWriteErrors(list2);
        if (z2) {
            hoodieJavaWriteClient.commit(str, list2);
        }
        assertPartitionMetadataForRecords(this.basePath, list, this.fs);
        HoodieTimeline commitsTimeline = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build().getCommitsTimeline();
        if (z) {
            Assertions.assertEquals(i4, commitsTimeline.findInstantsAfter(str3, Integer.MAX_VALUE).countInstants(), "Expecting " + i4 + " commits.");
            Assertions.assertEquals(str, ((HoodieInstant) commitsTimeline.lastInstant().get()).getTimestamp(), "Latest commit should be " + str);
            if (z3) {
                Assertions.assertEquals(i2, numRowsInCommit(this.basePath, commitsTimeline, str, true), "Must contain " + i2 + " records");
            }
            String[] strArr = new String[this.dataGen.getPartitionPaths().length];
            for (int i5 = 0; i5 < strArr.length; i5++) {
                strArr[i5] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i5]);
            }
            Assertions.assertEquals(i3, countRowsInPaths(this.basePath, this.fs, strArr), "Must contain " + i3 + " records");
            if (z3) {
                Assertions.assertEquals(numRowsInCommit(this.basePath, commitsTimeline, str, true), countRecordsOptionallySince(this.basePath, commitsTimeline, Option.of(str2)), "Incremental consumption from " + str2 + " should give all records in latest commit");
                if (option.isPresent()) {
                    ((List) option.get()).forEach(str4 -> {
                        Assertions.assertEquals(numRowsInCommit(this.basePath, commitsTimeline, str, true), countRecordsOptionallySince(this.basePath, commitsTimeline, Option.of(str4)), "Incremental consumption from " + str4 + " should give all records in latest commit");
                    });
                }
            }
        }
        return list2;
    }

    public HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean z, HoodieWriteConfig hoodieWriteConfig, HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> function2) {
        return z ? wrapRecordsGenFunctionForPreppedCalls(this.basePath, this.hadoopConf, this.context, hoodieWriteConfig, function2) : function2;
    }

    public HoodieWriterClientTestHarness.Function3<List<HoodieRecord>, String, Integer, String> generateWrapRecordsForPartitionFn(boolean z, HoodieWriteConfig hoodieWriteConfig, HoodieWriterClientTestHarness.Function3<List<HoodieRecord>, String, Integer, String> function3) {
        return z ? wrapPartitionRecordsGenFunctionForPreppedCalls(this.basePath, this.hadoopConf, this.context, hoodieWriteConfig, function3) : function3;
    }

    public static HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(String str, Configuration configuration, HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig, HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> function2) {
        return (str2, num) -> {
            return tagLocation(JavaHoodieIndexFactory.createIndex(hoodieWriteConfig), hoodieEngineContext, (List) function2.apply(str2, num), HoodieJavaTable.create(hoodieWriteConfig, hoodieEngineContext, HoodieTableMetaClient.builder().setConf(configuration).setBasePath(str).setLoadActiveTimelineOnLoad(true).build()));
        };
    }

    public static HoodieWriterClientTestHarness.Function3<List<HoodieRecord>, String, Integer, String> wrapPartitionRecordsGenFunctionForPreppedCalls(String str, Configuration configuration, HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig, HoodieWriterClientTestHarness.Function3<List<HoodieRecord>, String, Integer, String> function3) {
        return (str2, num, str3) -> {
            return tagLocation(JavaHoodieIndexFactory.createIndex(hoodieWriteConfig), hoodieEngineContext, (List) function3.apply(str2, num, str3), HoodieJavaTable.create(hoodieWriteConfig, hoodieEngineContext, HoodieTableMetaClient.builder().setConf(configuration).setBasePath(str).setLoadActiveTimelineOnLoad(true).build()));
        };
    }

    public Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean z, HoodieWriteConfig hoodieWriteConfig, Function<Integer, List<HoodieKey>> function) {
        return z ? wrapDeleteKeysGenFunctionForPreppedCalls(this.basePath, this.hadoopConf, this.context, hoodieWriteConfig, function) : function;
    }

    public static Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(String str, Configuration configuration, HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig, Function<Integer, List<HoodieKey>> function) {
        return num -> {
            HoodieIndex createIndex = JavaHoodieIndexFactory.createIndex(hoodieWriteConfig);
            List list = (List) function.apply(num);
            return (List) tagLocation(createIndex, hoodieEngineContext, (List) list.stream().map(hoodieKey -> {
                return new HoodieAvroRecord(hoodieKey, new EmptyHoodieRecordPayload());
            }).collect(Collectors.toList()), HoodieJavaTable.create(hoodieWriteConfig, hoodieEngineContext, HoodieTableMetaClient.builder().setConf(configuration).setBasePath(str).setLoadActiveTimelineOnLoad(true).build())).stream().map(hoodieRecord -> {
                return hoodieRecord.getKey();
            }).collect(Collectors.toList());
        };
    }

    public static List<HoodieRecord> tagLocation(HoodieIndex hoodieIndex, HoodieEngineContext hoodieEngineContext, List<HoodieRecord> list, HoodieTable hoodieTable) {
        return hoodieIndex.tagLocation(HoodieListData.eager(list), hoodieEngineContext, hoodieTable).collectAsList();
    }

    private List<WriteStatus> getWriteStatusAndVerifyDeleteOperation(String str, String str2, String str3, boolean z, int i, int i2, boolean z2, List<WriteStatus> list) {
        Assertions.assertNoWriteErrors(list);
        HoodieTimeline commitTimeline = new HoodieActiveTimeline(HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build()).getCommitTimeline();
        if (z) {
            Assertions.assertEquals(3, commitTimeline.findInstantsAfter(str3, Integer.MAX_VALUE).countInstants(), "Expecting 3 commits.");
            Assertions.assertEquals(str, ((HoodieInstant) commitTimeline.lastInstant().get()).getTimestamp(), "Latest commit should be " + str);
            if (z2) {
                Assertions.assertEquals(i, numRowsInCommit(this.basePath, commitTimeline, str, true), "Must contain " + i + " records");
            }
            String[] strArr = new String[this.dataGen.getPartitionPaths().length];
            for (int i3 = 0; i3 < strArr.length; i3++) {
                strArr[i3] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i3]);
            }
            Assertions.assertEquals(i2, countRowsInPaths(this.basePath, this.fs, strArr), "Must contain " + i2 + " records");
            if (z2) {
                Assertions.assertEquals(numRowsInCommit(this.basePath, commitTimeline, str, true), countRecordsOptionallySince(this.basePath, commitTimeline, Option.of(str2)), "Incremental consumption from " + str2 + " should give no records in latest commit, since it is a delete operation");
            }
        }
        return list;
    }

    public long numRowsInCommit(String str, HoodieTimeline hoodieTimeline, String str2, boolean z) {
        HoodieInstant hoodieInstant = new HoodieInstant(false, "commit", str2);
        if (!hoodieTimeline.containsInstant(hoodieInstant)) {
            throw new HoodieException("No commit exists at " + str2);
        }
        try {
            return getLatestFileIDsToFullPath(str, hoodieTimeline, Arrays.asList(hoodieInstant)).values().stream().flatMap(str3 -> {
                return BaseFileUtils.getInstance(str3).readAvroRecords(this.context.getHadoopConf().get(), new Path(str3)).stream();
            }).filter(genericRecord -> {
                if (!z) {
                    return true;
                }
                Object obj = genericRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
                return obj != null && obj.toString().equals(str2);
            }).count();
        } catch (Exception e) {
            throw new HoodieException("Error reading commit " + str2, e);
        }
    }

    private static HashMap<String, String> getLatestFileIDsToFullPath(String str, HoodieTimeline hoodieTimeline, List<HoodieInstant> list) throws IOException {
        HashMap<String, String> hashMap = new HashMap<>();
        Iterator<HoodieInstant> it = list.iterator();
        while (it.hasNext()) {
            hashMap.putAll(((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) hoodieTimeline.getInstantDetails(it.next()).get(), HoodieCommitMetadata.class)).getFileIdAndFullPaths(new Path(str)));
        }
        return hashMap;
    }

    public long countRowsInPaths(String str, FileSystem fileSystem, String... strArr) {
        try {
            return getLatestBaseFiles(str, fileSystem, strArr).stream().mapToLong(hoodieBaseFile -> {
                return BaseFileUtils.getInstance(hoodieBaseFile.getPath()).readAvroRecords(this.context.getHadoopConf().get(), new Path(hoodieBaseFile.getPath())).size();
            }).sum();
        } catch (Exception e) {
            throw new HoodieException("Error reading hoodie table as a dataframe", e);
        }
    }

    public static List<HoodieBaseFile> getLatestBaseFiles(String str, FileSystem fileSystem, String... strArr) {
        ArrayList arrayList = new ArrayList();
        try {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            for (String str2 : strArr) {
                arrayList.addAll((Collection) new HoodieTableFileSystemView(build, build.getCommitsTimeline().filterCompletedInstants(), fileSystem.globStatus(new Path(str2))).getLatestBaseFiles().collect(Collectors.toList()));
            }
            return arrayList;
        } catch (Exception e) {
            throw new HoodieException("Error reading hoodie table as a dataframe", e);
        }
    }

    public long countRecordsOptionallySince(String str, HoodieTimeline hoodieTimeline, Option<String> option) {
        try {
            HashMap<String, String> latestFileIDsToFullPath = getLatestFileIDsToFullPath(str, hoodieTimeline, option.isPresent() ? hoodieTimeline.findInstantsAfter((String) option.get(), Integer.MAX_VALUE).getInstants() : hoodieTimeline.getInstants());
            String[] strArr = (String[]) latestFileIDsToFullPath.values().toArray(new String[latestFileIDsToFullPath.size()]);
            if (strArr[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
                return Arrays.stream(strArr).flatMap(str2 -> {
                    return BaseFileUtils.getInstance(str2).readAvroRecords(this.context.getHadoopConf().get(), new Path(str2)).stream();
                }).filter(genericRecord -> {
                    if (!option.isPresent()) {
                        return true;
                    }
                    Object obj = genericRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
                    return obj != null && obj.toString().compareTo((String) option.get()) > 0;
                }).count();
            }
            if (!strArr[0].endsWith(HoodieFileFormat.HFILE.getFileExtension())) {
                throw new HoodieException("Unsupported base file format for file :" + strArr[0]);
            }
            Stream<GenericRecord> readHFile = readHFile(strArr);
            return option.isPresent() ? readHFile.filter(genericRecord2 -> {
                return HoodieTimeline.compareTimestamps((String) option.get(), HoodieActiveTimeline.LESSER_THAN, genericRecord2.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
            }).count() : readHFile.count();
        } catch (IOException e) {
            throw new HoodieException("Error pulling data incrementally from commitTimestamp :" + ((String) option.get()), e);
        }
    }

    public Stream<GenericRecord> readHFile(String[] strArr) {
        int i;
        LinkedList linkedList = new LinkedList();
        FileSystem fs = FSUtils.getFs(strArr[0], this.context.getHadoopConf().get());
        CacheConfig cacheConfig = new CacheConfig(fs.getConf());
        Schema schema = null;
        for (String str : strArr) {
            try {
                HFile.Reader createHFileReader = HoodieHFileUtils.createHFileReader(fs, new Path(str), cacheConfig, fs.getConf());
                if (schema == null) {
                    schema = new Schema.Parser().parse(new String(createHFileReader.getHFileInfo().get("schema".getBytes())));
                }
                HFileScanner scanner = createHFileReader.getScanner(false, false);
                i = scanner.seekTo() ? 0 : i + 1;
                do {
                    Cell cell = scanner.getCell();
                    linkedList.add(HoodieAvroUtils.bytesToAvro(Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength()), schema));
                } while (scanner.next());
            } catch (IOException e) {
                throw new HoodieException("Error reading hfile " + str + " as a dataframe", e);
            }
        }
        return linkedList.stream();
    }

    public HoodieWriteConfig.Builder getConfigBuilder(String str, HoodieIndex.IndexType indexType, HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy) {
        HoodieWriteConfig.Builder withFileSystemViewConfig = HoodieWriteConfig.newBuilder().withPath(this.basePath).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withEngineType(EngineType.JAVA).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).withWriteStatusClass(MetadataMergeWriteStatus.class).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(hoodieFailedWritesCleaningPolicy).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1048576L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1048576L).parquetMaxFileSize(1048576L).orcMaxFileSize(1048576L).build()).forTable("raw_trips").withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).withEmbeddedTimelineServerEnabled(false).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withEnableBackupForRemoteFileSystemView(false).withRemoteServerPort(Integer.valueOf(timelineServicePort)).withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
        if (StringUtils.nonEmpty(str)) {
            withFileSystemViewConfig.withSchema(str);
        }
        return withFileSystemViewConfig;
    }
}
