/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.compact;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionCommitTestSink;
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={FlinkMiniCluster.class})
public class ITTestHoodieFlinkCompactor {
    protected static final Logger LOG = LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class);
    private static final Map<String, List<String>> EXPECTED1 = new HashMap<String, List<String>>();
    private static final Map<String, List<String>> EXPECTED2 = new HashMap<String, List<String>>();
    private static final Map<String, List<String>> EXPECTED3 = new HashMap<String, List<String>>();
    @TempDir
    File tempFile;

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
        options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkCompactionConfig cfg = new FlinkCompactionConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        Configuration conf = FlinkCompactionConfig.toFlinkConfig((FlinkCompactionConfig)cfg);
        conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        CompactionUtil.inferChangelogMode((Configuration)conf, (HoodieTableMetaClient)metaClient);
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);){
            HoodieFlinkTable table = writeClient.getHoodieTable();
            String compactionInstantTime = this.scheduleCompactionPlan(writeClient);
            HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan((HoodieTableMetaClient)table.getMetaClient(), (String)compactionInstantTime);
            HoodieInstant instant = HoodieTestUtils.INSTANT_GENERATOR.getCompactionRequestedInstant(compactionInstantTime);
            table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
            env.addSource((SourceFunction)new CompactionPlanSourceFunction(Collections.singletonList(Pair.of((Object)compactionInstantTime, (Object)compactionPlan)), conf)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), (OneInputStreamOperator)new CompactOperator(conf)).setParallelism(4).addSink((SinkFunction)new CompactionCommitSink(conf)).name("compaction_commit").uid("uid_compaction_commit").setParallelism(1);
            env.execute("flink_hudi_compaction");
            TestData.checkWrittenDataCOW(this.tempFile, EXPECTED1);
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testHoodieFlinkCompactorWithUpgradeAndDowngrade(boolean upgrade) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)4);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
        options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkCompactionConfig cfg = new FlinkCompactionConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        Configuration conf = FlinkCompactionConfig.toFlinkConfig((FlinkCompactionConfig)cfg);
        conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.set(FlinkOptions.TABLE_NAME, (Object)metaClient.getTableConfig().getTableName());
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        CompactionUtil.inferChangelogMode((Configuration)conf, (HoodieTableMetaClient)metaClient);
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);){
            String compactionInstantTime = this.scheduleCompactionPlan(writeClient);
            if (upgrade) {
                metaClient.getTableConfig().setTableVersion(HoodieTableVersion.SIX);
                HoodieTableConfig.update((HoodieStorage)metaClient.getStorage(), (StoragePath)metaClient.getMetaPath(), (Properties)metaClient.getTableConfig().getProps());
                new UpgradeDowngrade(metaClient, writeClient.getConfig(), writeClient.getEngineContext(), (SupportsUpgradeDowngrade)FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.EIGHT, "none");
            } else {
                metaClient.getTableConfig().setTableVersion(HoodieTableVersion.EIGHT);
                new UpgradeDowngrade(metaClient, writeClient.getConfig(), writeClient.getEngineContext(), (SupportsUpgradeDowngrade)FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.SIX, "none");
                conf.setString("hoodie.write.table.version", "6");
            }
            metaClient.reloadTableConfig();
            metaClient.reloadActiveTimeline();
            HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan((HoodieTableMetaClient)metaClient, (String)compactionInstantTime);
            HoodieInstant instant = HoodieTestUtils.INSTANT_GENERATOR.getCompactionRequestedInstant(compactionInstantTime);
            metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
            conf.set(FlinkOptions.WRITE_TABLE_VERSION, (Object)(upgrade ? HoodieTableVersion.EIGHT.versionCode() : HoodieTableVersion.SIX.versionCode()));
            env.addSource((SourceFunction)new CompactionPlanSourceFunction(Collections.singletonList(Pair.of((Object)compactionInstantTime, (Object)compactionPlan)), conf)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), (OneInputStreamOperator)new CompactOperator(conf)).setParallelism(4).addSink((SinkFunction)new CompactionCommitSink(conf)).name("compaction_commit").uid("uid_compaction_commit").setParallelism(1);
            env.execute("flink_hudi_compaction");
            TestData.checkWrittenDataCOW(this.tempFile, EXPECTED1);
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',54,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',32,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id9','Jane',19,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id10','Ella',38,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id11','Phoebe',52,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        FlinkCompactionConfig cfg = new FlinkCompactionConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        cfg.minCompactionIntervalSeconds = 3;
        cfg.schedule = true;
        Configuration conf = FlinkCompactionConfig.toFlinkConfig((FlinkCompactionConfig)cfg);
        conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(), 4);
        HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf);
        asyncCompactionService.start(null);
        TimeUnit.SECONDS.sleep(10L);
        asyncCompactionService.shutDown();
        TestData.checkWrittenDataCOW(this.tempFile, EXPECTED2);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangelog) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkCompactionConfig cfg = new FlinkCompactionConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        Configuration conf = FlinkCompactionConfig.toFlinkConfig((FlinkCompactionConfig)cfg);
        conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        CompactionUtil.inferChangelogMode((Configuration)conf, (HoodieTableMetaClient)metaClient);
        ArrayList<String> compactionInstantTimeList = new ArrayList<String>(2);
        HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);
        compactionInstantTimeList.add(this.scheduleCompactionPlan(writeClient));
        String insertT1ForNewPartition = "insert into t1 values\n('id12','Tony',27,TIMESTAMP '1970-01-01 00:00:09','par5'),\n('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')";
        tableEnv.executeSql(insertT1ForNewPartition).await();
        writeClient.close();
        writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);
        HoodieFlinkTable table = writeClient.getHoodieTable();
        compactionInstantTimeList.add(this.scheduleCompactionPlan(writeClient));
        ArrayList<Pair> compactionPlans = new ArrayList<Pair>(2);
        for (String compactionInstantTime : compactionInstantTimeList) {
            HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan((HoodieTableMetaClient)table.getMetaClient(), (String)compactionInstantTime);
            compactionPlans.add(Pair.of((Object)compactionInstantTime, (Object)plan));
        }
        for (String compactionInstantTime : compactionInstantTimeList) {
            HoodieInstant hoodieInstant = HoodieTestUtils.INSTANT_GENERATOR.getCompactionRequestedInstant(compactionInstantTime);
            table.getActiveTimeline().transitionCompactionRequestedToInflight(hoodieInstant);
        }
        table.getMetaClient().reloadActiveTimeline();
        env.addSource((SourceFunction)new CompactionPlanSourceFunction(compactionPlans, conf)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), (OneInputStreamOperator)new CompactOperator(conf)).setParallelism(1).addSink((SinkFunction)new CompactionCommitSink(conf)).name("compaction_commit").uid("uid_compaction_commit").setParallelism(1);
        env.execute("flink_hudi_compaction");
        writeClient.close();
        TestData.checkWrittenDataCOW(this.tempFile, EXPECTED3);
    }

    @Test
    public void testCompactionInBatchExecutionMode() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',54,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',32,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id9','Jane',19,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id10','Ella',38,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id11','Phoebe',52,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TestData.checkWrittenDataCOW(this.tempFile, EXPECTED2);
    }

    @Test
    public void testOfflineCompactFailoverAfterCommit() {
        TableEnvironment tableEnv = this.prepareEnvAndTable();
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')");
        FlinkCompactionConfig cfg = new FlinkCompactionConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        Configuration conf = FlinkCompactionConfig.toFlinkConfig((FlinkCompactionConfig)cfg);
        Assertions.assertDoesNotThrow(() -> this.runOfflineCompact(tableEnv, conf));
        this.assertNoDuplicateFile(conf);
    }

    private void assertNoDuplicateFile(Configuration conf) {
        HashSet fileIdCommitTimeSet = new HashSet();
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        HoodieStorage storage = metaClient.getStorage();
        FSUtils.getAllPartitionPaths((HoodieEngineContext)HoodieFlinkEngineContext.DEFAULT, (HoodieStorage)metaClient.getStorage(), (StoragePath)metaClient.getBasePath(), (boolean)false).forEach(partition -> {
            try {
                storage.listDirectEntries(FSUtils.constructAbsolutePath((StoragePath)metaClient.getBasePath(), (String)partition)).stream().filter(f -> FSUtils.isBaseFile((StoragePath)f.getPath())).forEach(f -> {
                    HoodieBaseFile baseFile = new HoodieBaseFile(f);
                    Assertions.assertFalse((boolean)fileIdCommitTimeSet.contains(Pair.of((Object)baseFile.getFileId(), (Object)baseFile.getCommitTime())));
                    fileIdCommitTimeSet.add(Pair.of((Object)baseFile.getFileId(), (Object)baseFile.getCommitTime()));
                });
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        Assertions.assertFalse((boolean)fileIdCommitTimeSet.isEmpty());
    }

    private TableEnvironment prepareEnvAndTable() {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        tableEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC, (Object)true);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.METADATA_ENABLED.key(), "false");
        options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
        options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        return tableEnv;
    }

    private void runOfflineCompact(TableEnvironment tableEnv, Configuration conf) throws Exception {
        conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        CompactionUtil.inferChangelogMode((Configuration)conf, (HoodieTableMetaClient)metaClient);
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);){
            HoodieFlinkTable table = writeClient.getHoodieTable();
            String compactionInstantTime = this.scheduleCompactionPlan(writeClient);
            HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan((HoodieTableMetaClient)table.getMetaClient(), (String)compactionInstantTime);
            HoodieInstant instant = HoodieTestUtils.INSTANT_GENERATOR.getCompactionRequestedInstant(compactionInstantTime);
            table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
            tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')");
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (Time)Time.milliseconds((long)1L)));
            env.addSource((SourceFunction)new CompactionPlanSourceFunction(Collections.singletonList(Pair.of((Object)compactionInstantTime, (Object)compactionPlan)), conf)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), (OneInputStreamOperator)new CompactOperator(conf)).setParallelism(1).addSink((SinkFunction)new CompactionCommitTestSink(conf)).name("compaction_commit").uid("uid_compaction_commit").setParallelism(1);
            env.execute("flink_hudi_compaction");
        }
    }

    private String scheduleCompactionPlan(HoodieFlinkWriteClient<?> writeClient) {
        Option compactionInstant = writeClient.scheduleCompaction(Option.empty());
        Assertions.assertTrue((boolean)compactionInstant.isPresent(), (String)"The compaction plan should be scheduled");
        return (String)compactionInstant.get();
    }

    static {
        EXPECTED1.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
        EXPECTED1.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
        EXPECTED1.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
        EXPECTED1.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
        EXPECTED2.put("par1", Arrays.asList("id1,par1,id1,Danny,24,1000,par1", "id2,par1,id2,Stephen,34,2000,par1"));
        EXPECTED2.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2"));
        EXPECTED2.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3", "id9,par3,id9,Jane,19,6000,par3"));
        EXPECTED2.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4", "id10,par4,id10,Ella,38,7000,par4", "id11,par4,id11,Phoebe,52,8000,par4"));
        EXPECTED3.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
        EXPECTED3.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
        EXPECTED3.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
        EXPECTED3.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
        EXPECTED3.put("par5", Arrays.asList("id12,par5,id12,Tony,27,9000,par5", "id13,par5,id13,Jenny,72,10000,par5"));
    }
}

