package org.apache.hudi.sink.compact;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith({FlinkMiniCluster.class})
/* loaded from: input_file:org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.class */
public class ITTestHoodieFlinkCompactor {
    protected static final Logger LOG = LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class);
    private static final Map<String, List<String>> EXPECTED1 = new HashMap();
    private static final Map<String, List<String>> EXPECTED2 = new HashMap();
    private static final Map<String, List<String>> EXPECTED3 = new HashMap();

    @TempDir
    File tempFile;

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHoodieFlinkCompactor(boolean z) throws Exception {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), z + "");
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1).await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkCompactionConfig flinkCompactionConfig = new FlinkCompactionConfig();
        flinkCompactionConfig.path = this.tempFile.getAbsolutePath();
        Configuration flinkConfig = FlinkCompactionConfig.toFlinkConfig(flinkCompactionConfig);
        flinkConfig.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(flinkConfig);
        flinkConfig.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        CompactionUtil.setAvroSchema(flinkConfig, createMetaClient);
        CompactionUtil.inferChangelogMode(flinkConfig, createMetaClient);
        HoodieFlinkWriteClient<?> createWriteClient = FlinkWriteClients.createWriteClient(flinkConfig);
        Throwable th = null;
        try {
            try {
                String scheduleCompactionPlan = scheduleCompactionPlan(createMetaClient, createWriteClient);
                HoodieFlinkTable hoodieTable = createWriteClient.getHoodieTable();
                HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(hoodieTable.getMetaClient(), scheduleCompactionPlan);
                hoodieTable.getActiveTimeline().transitionCompactionRequestedToInflight(HoodieTimeline.getCompactionRequestedInstant(scheduleCompactionPlan));
                executionEnvironment.addSource(new CompactionPlanSourceFunction(Collections.singletonList(Pair.of(scheduleCompactionPlan, compactionPlan)), flinkConfig)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new CompactOperator(flinkConfig)).setParallelism(4).addSink(new CompactionCommitSink(flinkConfig)).name("compaction_commit").uid("uid_compaction_commit").setParallelism(1);
                executionEnvironment.execute("flink_hudi_compaction");
                TestData.checkWrittenDataCOW(this.tempFile, EXPECTED1);
                if (createWriteClient != null) {
                    if (0 == 0) {
                        createWriteClient.close();
                        return;
                    }
                    try {
                        createWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriteClient != null) {
                if (th != null) {
                    try {
                        createWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHoodieFlinkCompactorWithUpgradeAndDowngrade(boolean z) throws Exception {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1).await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkCompactionConfig flinkCompactionConfig = new FlinkCompactionConfig();
        flinkCompactionConfig.path = this.tempFile.getAbsolutePath();
        Configuration flinkConfig = FlinkCompactionConfig.toFlinkConfig(flinkCompactionConfig);
        flinkConfig.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(flinkConfig);
        flinkConfig.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        CompactionUtil.setAvroSchema(flinkConfig, createMetaClient);
        CompactionUtil.inferChangelogMode(flinkConfig, createMetaClient);
        HoodieFlinkWriteClient<?> createWriteClient = FlinkWriteClients.createWriteClient(flinkConfig);
        Throwable th = null;
        try {
            String scheduleCompactionPlan = scheduleCompactionPlan(createMetaClient, createWriteClient);
            HoodieFlinkTable hoodieTable = createWriteClient.getHoodieTable();
            if (z) {
                createMetaClient.getTableConfig().setTableVersion(HoodieTableVersion.FIVE);
                new UpgradeDowngrade(createMetaClient, createWriteClient.getConfig(), createWriteClient.getEngineContext(), FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.SIX, "none");
            } else {
                createMetaClient.getTableConfig().setTableVersion(HoodieTableVersion.SIX);
                new UpgradeDowngrade(createMetaClient, createWriteClient.getConfig(), createWriteClient.getEngineContext(), FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.FIVE, "none");
            }
            HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(hoodieTable.getMetaClient(), scheduleCompactionPlan);
            hoodieTable.getActiveTimeline().transitionCompactionRequestedToInflight(HoodieTimeline.getCompactionRequestedInstant(scheduleCompactionPlan));
            executionEnvironment.addSource(new CompactionPlanSourceFunction(Collections.singletonList(Pair.of(scheduleCompactionPlan, compactionPlan)), flinkConfig)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new CompactOperator(flinkConfig)).setParallelism(4).addSink(new CompactionCommitSink(flinkConfig)).name("compaction_commit").uid("uid_compaction_commit").setParallelism(1);
            executionEnvironment.execute("flink_hudi_compaction");
            TestData.checkWrittenDataCOW(this.tempFile, EXPECTED1);
            if (createWriteClient != null) {
                if (0 == 0) {
                    createWriteClient.close();
                    return;
                }
                try {
                    createWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createWriteClient != null) {
                if (0 != 0) {
                    try {
                        createWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHoodieFlinkCompactorService(boolean z) throws Exception {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), z + "");
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1).await();
        create.executeSql(TestSQL.UPDATE_INSERT_T1).await();
        FlinkCompactionConfig flinkCompactionConfig = new FlinkCompactionConfig();
        flinkCompactionConfig.path = this.tempFile.getAbsolutePath();
        flinkCompactionConfig.minCompactionIntervalSeconds = 3;
        flinkCompactionConfig.schedule = true;
        Configuration flinkConfig = FlinkCompactionConfig.toFlinkConfig(flinkCompactionConfig);
        flinkConfig.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        flinkConfig.setInteger(FlinkOptions.COMPACTION_TASKS.key(), 4);
        HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(flinkCompactionConfig, flinkConfig);
        asyncCompactionService.start((Function) null);
        TimeUnit.SECONDS.sleep(10L);
        asyncCompactionService.shutDown();
        TestData.checkWrittenDataCOW(this.tempFile, EXPECTED2);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean z) throws Exception {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), z + "");
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1).await();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkCompactionConfig flinkCompactionConfig = new FlinkCompactionConfig();
        flinkCompactionConfig.path = this.tempFile.getAbsolutePath();
        Configuration flinkConfig = FlinkCompactionConfig.toFlinkConfig(flinkCompactionConfig);
        flinkConfig.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(flinkConfig);
        flinkConfig.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        CompactionUtil.setAvroSchema(flinkConfig, createMetaClient);
        CompactionUtil.inferChangelogMode(flinkConfig, createMetaClient);
        ArrayList<String> arrayList = new ArrayList(2);
        HoodieFlinkWriteClient<?> createWriteClient = FlinkWriteClients.createWriteClient(flinkConfig);
        arrayList.add(scheduleCompactionPlan(createMetaClient, createWriteClient));
        create.executeSql("insert into t1 values\n('id12','Tony',27,TIMESTAMP '1970-01-01 00:00:09','par5'),\n('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')").await();
        createWriteClient.close();
        HoodieFlinkWriteClient<?> createWriteClient2 = FlinkWriteClients.createWriteClient(flinkConfig);
        createMetaClient.reloadActiveTimeline();
        arrayList.add(scheduleCompactionPlan(createMetaClient, createWriteClient2));
        HoodieFlinkTable hoodieTable = createWriteClient2.getHoodieTable();
        ArrayList arrayList2 = new ArrayList(2);
        for (String str : arrayList) {
            arrayList2.add(Pair.of(str, CompactionUtils.getCompactionPlan(hoodieTable.getMetaClient(), str)));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            hoodieTable.getActiveTimeline().transitionCompactionRequestedToInflight(HoodieTimeline.getCompactionRequestedInstant((String) it.next()));
        }
        hoodieTable.getMetaClient().reloadActiveTimeline();
        executionEnvironment.addSource(new CompactionPlanSourceFunction(arrayList2, flinkConfig)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new CompactOperator(flinkConfig)).setParallelism(1).addSink(new CompactionCommitSink(flinkConfig)).name("compaction_commit").uid("uid_compaction_commit").setParallelism(1);
        executionEnvironment.execute("flink_hudi_compaction");
        createWriteClient2.close();
        TestData.checkWrittenDataCOW(this.tempFile, EXPECTED3);
    }

    @Test
    public void testCompactionInBatchExecutionMode() throws Exception {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1).await();
        create.executeSql(TestSQL.UPDATE_INSERT_T1).await();
        TestData.checkWrittenDataCOW(this.tempFile, EXPECTED2);
    }

    @Test
    public void testOfflineCompactFailoverAfterCommit() {
        TableEnvironment prepareEnvAndTable = prepareEnvAndTable();
        prepareEnvAndTable.executeSql(TestSQL.INSERT_T1);
        FlinkCompactionConfig flinkCompactionConfig = new FlinkCompactionConfig();
        flinkCompactionConfig.path = this.tempFile.getAbsolutePath();
        Configuration flinkConfig = FlinkCompactionConfig.toFlinkConfig(flinkCompactionConfig);
        Assertions.assertDoesNotThrow(() -> {
            runOfflineCompact(prepareEnvAndTable, flinkConfig);
        });
        assertNoDuplicateFile(flinkConfig);
    }

    private void assertNoDuplicateFile(Configuration configuration) {
        HashSet hashSet = new HashSet();
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(configuration);
        HoodieWrapperFileSystem fs = createMetaClient.getFs();
        FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT, createMetaClient.getBasePath(), false, false).forEach(str -> {
            try {
                Arrays.stream(fs.listStatus(FSUtils.getPartitionPath(createMetaClient.getBasePathV2(), str))).filter(fileStatus -> {
                    return FSUtils.isBaseFile(fileStatus.getPath());
                }).forEach(fileStatus2 -> {
                    HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fileStatus2);
                    Assertions.assertFalse(hashSet.contains(Pair.of(hoodieBaseFile.getFileId(), hoodieBaseFile.getCommitTime())));
                    hashSet.add(Pair.of(hoodieBaseFile.getFileId(), hoodieBaseFile.getCommitTime()));
                });
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        Assertions.assertFalse(hashSet.isEmpty());
    }

    private TableEnvironment prepareEnvAndTable() {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        create.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC, true);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.METADATA_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        return create;
    }

    private void runOfflineCompact(TableEnvironment tableEnvironment, Configuration configuration) throws Exception {
        configuration.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(configuration);
        configuration.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        CompactionUtil.setAvroSchema(configuration, createMetaClient);
        CompactionUtil.inferChangelogMode(configuration, createMetaClient);
        HoodieFlinkWriteClient<?> createWriteClient = FlinkWriteClients.createWriteClient(configuration);
        Throwable th = null;
        try {
            try {
                String scheduleCompactionPlan = scheduleCompactionPlan(createMetaClient, createWriteClient);
                HoodieFlinkTable hoodieTable = createWriteClient.getHoodieTable();
                HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(hoodieTable.getMetaClient(), scheduleCompactionPlan);
                hoodieTable.getActiveTimeline().transitionCompactionRequestedToInflight(HoodieTimeline.getCompactionRequestedInstant(scheduleCompactionPlan));
                tableEnvironment.executeSql(TestSQL.INSERT_T1);
                StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
                executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(1L)));
                executionEnvironment.addSource(new CompactionPlanSourceFunction(Collections.singletonList(Pair.of(scheduleCompactionPlan, compactionPlan)), configuration)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new CompactOperator(configuration)).setParallelism(1).addSink(new CompactionCommitTestSink(configuration)).name("compaction_commit").uid("uid_compaction_commit").setParallelism(1);
                executionEnvironment.execute("flink_hudi_compaction");
                if (createWriteClient != null) {
                    if (0 == 0) {
                        createWriteClient.close();
                        return;
                    }
                    try {
                        createWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriteClient != null) {
                if (th != null) {
                    try {
                        createWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriteClient.close();
                }
            }
            throw th4;
        }
    }

    private String scheduleCompactionPlan(HoodieTableMetaClient hoodieTableMetaClient, HoodieFlinkWriteClient<?> hoodieFlinkWriteClient) {
        boolean z = false;
        Option compactionInstantTime = CompactionUtil.getCompactionInstantTime(hoodieTableMetaClient);
        if (compactionInstantTime.isPresent()) {
            z = hoodieFlinkWriteClient.scheduleCompactionAtInstant((String) compactionInstantTime.get(), Option.empty());
        }
        Assertions.assertTrue(z, "The compaction plan should be scheduled");
        return (String) compactionInstantTime.get();
    }

    static {
        EXPECTED1.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
        EXPECTED1.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
        EXPECTED1.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
        EXPECTED1.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
        EXPECTED2.put("par1", Arrays.asList("id1,par1,id1,Danny,24,1000,par1", "id2,par1,id2,Stephen,34,2000,par1"));
        EXPECTED2.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2"));
        EXPECTED2.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3", "id9,par3,id9,Jane,19,6000,par3"));
        EXPECTED2.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4", "id10,par4,id10,Ella,38,7000,par4", "id11,par4,id11,Phoebe,52,8000,par4"));
        EXPECTED3.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
        EXPECTED3.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
        EXPECTED3.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
        EXPECTED3.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
        EXPECTED3.put("par5", Arrays.asList("id12,par5,id12,Tony,27,9000,par5", "id13,par5,id13,Jenny,72,10000,par5"));
    }
}
