package org.apache.hudi.table.action.compact;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Assertions;

/* loaded from: input_file:org/apache/hudi/table/action/compact/CompactionTestBase.class */
public class CompactionTestBase extends HoodieClientTestBase {
    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieWriteConfig.Builder getConfigBuilder(Boolean bool) {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": null, \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withAutoCommit(bool.booleanValue()).withAssumeDatePartitioning(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1073741824L).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1073741824L).parquetMaxFileSize(1073741824L).build()).forTable("test-trip-table").withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
    }

    protected void validateDeltaCommit(String str, Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> map, HoodieWriteConfig hoodieWriteConfig) {
        getCurrentLatestFileSlices(getHoodieTable(new HoodieTableMetaClient(this.hadoopConf, hoodieWriteConfig.getBasePath()), hoodieWriteConfig)).forEach(fileSlice -> {
            Pair pair = (Pair) map.get(fileSlice.getFileGroupId());
            if (pair == null) {
                Assertions.assertTrue(fileSlice.getBaseInstantTime().compareTo(str) <= 0, "Expect baseInstant to be less than or equal to latestDeltaCommit");
                return;
            }
            Assertions.assertEquals(fileSlice.getBaseInstantTime(), pair.getKey(), "Expect baseInstant to match compaction Instant");
            Assertions.assertTrue(fileSlice.getLogFiles().count() > 0, "Expect atleast one log file to be present where the latest delta commit was written");
            Assertions.assertFalse(fileSlice.getBaseFile().isPresent(), "Expect no data-file to be present");
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<HoodieRecord> runNextDeltaCommits(SparkRDDWriteClient sparkRDDWriteClient, HoodieReadClient hoodieReadClient, List<String> list, List<HoodieRecord> list2, HoodieWriteConfig hoodieWriteConfig, boolean z, List<String> list3) throws Exception {
        HoodieTableMetaClient hoodieTableMetaClient = new HoodieTableMetaClient(this.hadoopConf, hoodieWriteConfig.getBasePath());
        Assertions.assertEquals(list3, (List) hoodieReadClient.getPendingCompactions().stream().map(pair -> {
            return (String) pair.getKey();
        }).sorted().collect(Collectors.toList()));
        Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> allPendingCompactionOperations = CompactionUtils.getAllPendingCompactionOperations(hoodieTableMetaClient);
        if (z) {
            String str = list.get(0);
            list = list.subList(1, list.size());
            JavaRDD parallelize = this.jsc.parallelize(list2, 1);
            sparkRDDWriteClient.startCommitWithTime(str);
            JavaRDD upsert = sparkRDDWriteClient.upsert(parallelize, str);
            List collect = upsert.collect();
            if (!hoodieWriteConfig.shouldAutoCommit().booleanValue()) {
                sparkRDDWriteClient.commit(str, upsert);
            }
            org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
            Assertions.assertTrue(getCurrentLatestBaseFiles(getHoodieTable(new HoodieTableMetaClient(this.hadoopConf, hoodieWriteConfig.getBasePath()), hoodieWriteConfig)).stream().findAny().isPresent(), "should list the parquet files we wrote in the delta commit");
            validateDeltaCommit(str, allPendingCompactionOperations, hoodieWriteConfig);
        }
        int size = list2.size();
        for (String str2 : list) {
            list2 = this.dataGen.generateUpdates(str2, Integer.valueOf(size));
            createNextDeltaCommit(str2, list2, sparkRDDWriteClient, new HoodieTableMetaClient(this.hadoopConf, hoodieWriteConfig.getBasePath()), hoodieWriteConfig, false);
            validateDeltaCommit(str2, allPendingCompactionOperations, hoodieWriteConfig);
        }
        return list2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void moveCompactionFromRequestedToInflight(String str, HoodieWriteConfig hoodieWriteConfig) {
        HoodieTableMetaClient hoodieTableMetaClient = new HoodieTableMetaClient(this.hadoopConf, hoodieWriteConfig.getBasePath());
        hoodieTableMetaClient.getActiveTimeline().transitionCompactionRequestedToInflight(HoodieTimeline.getCompactionRequestedInstant(str));
        Assertions.assertTrue(((HoodieInstant) hoodieTableMetaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants().filter(hoodieInstant -> {
            return hoodieInstant.getTimestamp().equals(str);
        }).findAny().get()).isInflight(), "Instant must be marked inflight");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void scheduleCompaction(String str, SparkRDDWriteClient sparkRDDWriteClient, HoodieWriteConfig hoodieWriteConfig) {
        sparkRDDWriteClient.scheduleCompactionAtInstant(str, Option.empty());
        Assertions.assertEquals(str, ((HoodieInstant) new HoodieTableMetaClient(this.hadoopConf, hoodieWriteConfig.getBasePath()).getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get()).getTimestamp(), "Last compaction instant must be the one set");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void scheduleAndExecuteCompaction(String str, SparkRDDWriteClient sparkRDDWriteClient, HoodieTable hoodieTable, HoodieWriteConfig hoodieWriteConfig, int i, boolean z) throws IOException {
        scheduleCompaction(str, sparkRDDWriteClient, hoodieWriteConfig);
        executeCompaction(str, sparkRDDWriteClient, hoodieTable, hoodieWriteConfig, i, z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void executeCompaction(String str, SparkRDDWriteClient sparkRDDWriteClient, HoodieTable hoodieTable, HoodieWriteConfig hoodieWriteConfig, int i, boolean z) throws IOException {
        sparkRDDWriteClient.compact(str);
        List<FileSlice> currentLatestFileSlices = getCurrentLatestFileSlices(hoodieTable);
        Assertions.assertTrue(currentLatestFileSlices.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
        Assertions.assertFalse(currentLatestFileSlices.stream().anyMatch(fileSlice -> {
            return !fileSlice.getBaseInstantTime().equals(str);
        }), "Verify all file-slices have base-instant same as compaction instant");
        Assertions.assertFalse(currentLatestFileSlices.stream().anyMatch(fileSlice2 -> {
            return !fileSlice2.getBaseFile().isPresent();
        }), "Verify all file-slices have data-files");
        if (z) {
            Assertions.assertFalse(currentLatestFileSlices.stream().anyMatch(fileSlice3 -> {
                return fileSlice3.getLogFiles().count() == 0;
            }), "Verify all file-slices have atleast one log-file");
        } else {
            Assertions.assertFalse(currentLatestFileSlices.stream().anyMatch(fileSlice4 -> {
                return fileSlice4.getLogFiles().count() > 0;
            }), "Verify all file-slices have no log-files");
        }
        HoodieTimeline filterCompletedInstants = getHoodieTable(new HoodieTableMetaClient(this.hadoopConf, hoodieWriteConfig.getBasePath(), true), hoodieWriteConfig).getMetaClient().getCommitTimeline().filterCompletedInstants();
        Assertions.assertEquals(((HoodieInstant) filterCompletedInstants.lastInstant().get()).getTimestamp(), str, "Expect compaction instant time to be the latest commit time");
        Assertions.assertEquals(i, HoodieClientTestUtils.countRecordsSince(this.jsc, this.basePath, this.sqlContext, filterCompletedInstants, "000"), "Must contain expected records");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void executeCompactionWithReplacedFiles(String str, SparkRDDWriteClient sparkRDDWriteClient, HoodieTable hoodieTable, HoodieWriteConfig hoodieWriteConfig, String[] strArr, Set<HoodieFileGroupId> set) throws IOException {
        sparkRDDWriteClient.compact(str);
        List<FileSlice> currentLatestFileSlices = getCurrentLatestFileSlices(hoodieTable);
        Assertions.assertTrue(currentLatestFileSlices.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
        Assertions.assertFalse(currentLatestFileSlices.stream().anyMatch(fileSlice -> {
            return set.contains(fileSlice.getFileGroupId());
        }), "Compacted files should not show up in latest slices");
        HoodieSparkTable hoodieTable2 = getHoodieTable(new HoodieTableMetaClient(this.hadoopConf, hoodieWriteConfig.getBasePath(), true), hoodieWriteConfig);
        Assertions.assertTrue(hoodieTable2.getMetaClient().getCommitTimeline().filterCompletedInstants().filterCompletedInstants().getInstants().filter(hoodieInstant -> {
            return str.equals(hoodieInstant.getTimestamp());
        }).findFirst().isPresent());
        for (String str2 : strArr) {
            hoodieTable2.getSliceView().getLatestFileSlicesBeforeOrOn(str2, str, true).forEach(fileSlice2 -> {
                Assertions.assertEquals(0L, fileSlice2.getLogFiles().count());
                Assertions.assertTrue(fileSlice2.getBaseFile().isPresent());
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<WriteStatus> createNextDeltaCommit(String str, List<HoodieRecord> list, SparkRDDWriteClient sparkRDDWriteClient, HoodieTableMetaClient hoodieTableMetaClient, HoodieWriteConfig hoodieWriteConfig, boolean z) {
        JavaRDD parallelize = this.jsc.parallelize(list, 1);
        sparkRDDWriteClient.startCommitWithTime(str);
        JavaRDD upsert = sparkRDDWriteClient.upsert(parallelize, str);
        List<WriteStatus> collect = upsert.collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        if (!hoodieWriteConfig.shouldAutoCommit().booleanValue() && !z) {
            sparkRDDWriteClient.commit(str, upsert);
        }
        Option lastInstant = hoodieTableMetaClient.getActiveTimeline().reload().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
        if (!z || hoodieWriteConfig.shouldAutoCommit().booleanValue()) {
            Assertions.assertTrue(lastInstant.isPresent());
            Assertions.assertEquals(str, ((HoodieInstant) lastInstant.get()).getTimestamp(), "Delta commit should be latest instant");
        } else {
            Assertions.assertTrue(((HoodieInstant) lastInstant.get()).getTimestamp().compareTo(str) < 0, "Delta commit should not be latest instant");
        }
        return collect;
    }

    protected List<HoodieBaseFile> getCurrentLatestBaseFiles(HoodieTable hoodieTable) throws IOException {
        return (List) getHoodieTableFileSystemView(hoodieTable.getMetaClient(), hoodieTable.getCompletedCommitsTimeline(), HoodieTestTable.of(hoodieTable.getMetaClient()).listAllBaseFiles()).getLatestBaseFiles().collect(Collectors.toList());
    }

    protected List<FileSlice> getCurrentLatestFileSlices(HoodieTable hoodieTable) {
        HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(hoodieTable.getMetaClient(), hoodieTable.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
        Stream stream = Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
        hoodieTableFileSystemView.getClass();
        return (List) stream.flatMap(hoodieTableFileSystemView::getLatestFileSlices).collect(Collectors.toList());
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.MERGE_ON_READ;
    }
}
