package org.apache.hudi.table.functional;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.class */
class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTestHarness {
    TestHoodieSparkMergeOnReadTableClustering() {
    }

    private static Stream<Arguments> testClustering() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{true, true, true}), Arguments.of(new Object[]{true, true, false}), Arguments.of(new Object[]{true, false, true}), Arguments.of(new Object[]{true, false, false}), Arguments.of(new Object[]{false, true, true}), Arguments.of(new Object[]{false, true, false}), Arguments.of(new Object[]{false, false, true}), Arguments.of(new Object[]{false, false, false})});
    }

    @MethodSource
    @ParameterizedTest
    void testClustering(boolean z, boolean z2, boolean z3) throws Exception {
        HoodieWriteConfig.Builder withRollbackUsingMarkers = HoodieWriteConfig.newBuilder().forTable("test-trip-table").withPath(basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10L).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1073741824L).parquetMaxFileSize(1073741824L).build()).withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder().withEnableBackupForRemoteFileSystemView(false).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withClusteringConfig(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClustering(true).withInlineClusteringNumCommits(1).withPreserveHoodieCommitMetadata(Boolean.valueOf(z3)).build()).withRollbackUsingMarkers(false);
        addConfigsForPopulateMetaFields(withRollbackUsingMarkers, z2);
        HoodieWriteConfig build = withRollbackUsingMarkers.build();
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties) build.getProps());
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SparkRDDWriteClient hoodieWriteClient = m43getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                List generateInserts = hoodieTestDataGenerator.generateInserts("001", 400);
                Assertions.assertTrue(insertRecords(hoodieMetaClient, generateInserts.subList(0, 200), hoodieWriteClient, build, "001").findAny().isPresent(), "should list the base files we wrote in the delta commit");
                hoodieWriteClient.startCommitWithTime("002");
                Assertions.assertTrue(insertRecords(hoodieMetaClient, generateInserts.subList(200, 400), hoodieWriteClient, build, "002").findAny().isPresent(), "should list the base files we wrote in the delta commit");
                if (z) {
                    hoodieWriteClient.startCommitWithTime("003");
                    updateRecords(hoodieMetaClient, hoodieTestDataGenerator.generateUpdates("003", 100), hoodieWriteClient, build, "003");
                }
                HoodieSparkTable create = HoodieSparkTable.create(build, m42context(), hoodieMetaClient);
                create.getHoodieView().sync();
                Assertions.assertEquals(hoodieTestDataGenerator.getPartitionPaths().length * 2, listAllBaseFilesInPath(create).length);
                String obj = hoodieWriteClient.scheduleClustering(Option.empty()).get().toString();
                HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieMetaClient);
                Assertions.assertEquals(r0.length, HoodieSparkTable.create(build, m42context(), reload).getFileSystemView().getFileGroupsInPendingClustering().map((v0) -> {
                    return v0.getLeft();
                }).count());
                doClusteringAndValidate(hoodieWriteClient, obj, reload, build, hoodieTestDataGenerator);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testClusteringWithNoBaseFiles(boolean z) throws Exception {
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().forTable("test-trip-table").withPath(basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10L).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1073741824L).parquetMaxFileSize(1073741824L).build()).withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder().withEnableBackupForRemoteFileSystemView(false).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).withClusteringConfig(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClustering(true).withInlineClusteringNumCommits(1).build()).withRollbackUsingMarkers(false).build();
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties) build.getProps());
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SparkRDDWriteClient hoodieWriteClient = m43getHoodieWriteClient(build);
        Throwable th = null;
        try {
            hoodieWriteClient.startCommitWithTime("001");
            List generateInserts = hoodieTestDataGenerator.generateInserts("001", 400);
            Assertions.assertTrue(!insertRecords(hoodieMetaClient, generateInserts.subList(0, 200), hoodieWriteClient, build, "001").findAny().isPresent(), "should not have any base files");
            hoodieWriteClient.startCommitWithTime("002");
            Assertions.assertTrue(!insertRecords(hoodieMetaClient, generateInserts.subList(200, 400), hoodieWriteClient, build, "002").findAny().isPresent(), "should not have any base files");
            if (z) {
                hoodieWriteClient.startCommitWithTime("003");
                updateRecords(hoodieMetaClient, hoodieTestDataGenerator.generateUpdates("003", 100), hoodieWriteClient, build, "003");
            }
            HoodieSparkTable create = HoodieSparkTable.create(build, m42context(), hoodieMetaClient);
            create.getHoodieView().sync();
            Assertions.assertEquals(0, listAllBaseFilesInPath(create).length);
            String obj = hoodieWriteClient.scheduleClustering(Option.empty()).get().toString();
            HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieMetaClient);
            Assertions.assertEquals(hoodieTestDataGenerator.getPartitionPaths().length, HoodieSparkTable.create(build, m42context(), reload).getFileSystemView().getFileGroupsInPendingClustering().map((v0) -> {
                return v0.getLeft();
            }).count());
            doClusteringAndValidate(hoodieWriteClient, obj, reload, build, hoodieTestDataGenerator);
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private void doClusteringAndValidate(SparkRDDWriteClient sparkRDDWriteClient, String str, HoodieTableMetaClient hoodieTableMetaClient, HoodieWriteConfig hoodieWriteConfig, HoodieTestDataGenerator hoodieTestDataGenerator) {
        sparkRDDWriteClient.cluster(str, true);
        HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieTableMetaClient);
        HoodieSparkTable create = HoodieSparkTable.create(hoodieWriteConfig, m42context(), reload);
        create.getHoodieView().sync();
        Assertions.assertEquals(hoodieTestDataGenerator.getPartitionPaths().length, Arrays.stream(hoodieTestDataGenerator.getPartitionPaths()).flatMap(str2 -> {
            return create.getBaseFileOnlyView().getLatestBaseFiles(str2);
        }).count());
        HoodieTimeline filterCompletedInstants = reload.getCommitTimeline().filterCompletedInstants();
        Assertions.assertEquals(1, filterCompletedInstants.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(), "Expecting a single commit.");
        Assertions.assertEquals(str, ((HoodieInstant) filterCompletedInstants.lastInstant().get()).getTimestamp());
        Assertions.assertEquals("replacecommit", ((HoodieInstant) filterCompletedInstants.lastInstant().get()).getAction());
        if (hoodieWriteConfig.populateMetaFields()) {
            Assertions.assertEquals(400L, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), filterCompletedInstants, Option.of("000")), "Must contain 200 records");
        } else {
            Assertions.assertEquals(400L, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), filterCompletedInstants, Option.empty()));
        }
    }
}
