/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.cluster;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.cluster.ClusteringCommitTestSink;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith(value={FlinkMiniCluster.class})
public class ITTestHoodieFlinkClustering {
    private static final Map<String, String> EXPECTED = new HashMap<String, String>();
    @TempDir
    File tempFile;

    @Test
    public void testHoodieFlinkClustering() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkClusteringConfig cfg = new FlinkClusteringConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        cfg.targetPartitions = 4;
        cfg.sortMemory = 256;
        Configuration conf = FlinkClusteringConfig.toFlinkConfig((FlinkClusteringConfig)cfg);
        Assertions.assertEquals((int)256, (int)conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY));
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
        long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
        conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);){
            HoodieFlinkTable table = writeClient.getHoodieTable();
            String clusteringInstantTime = writeClient.createNewInstantTime();
            boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
            Assertions.assertTrue((boolean)scheduled, (String)"The clustering plan should be scheduled");
            table.getMetaClient().reloadActiveTimeline();
            HoodieTimeline timeline = table.getActiveTimeline().filterPendingClusteringTimeline().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
            Option clusteringPlanOption = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)((HoodieInstant)timeline.lastInstant().get()));
            HoodieClusteringPlan clusteringPlan = (HoodieClusteringPlan)((Pair)clusteringPlanOption.get()).getRight();
            HoodieInstant instant2 = HoodieTestUtils.INSTANT_GENERATOR.getClusteringCommitRequestedInstant(clusteringInstantTime);
            table.getActiveTimeline().transitionClusterRequestedToInflight(instant2, Option.empty());
            Schema tableAvroSchema = StreamerUtil.getTableAvroSchema((HoodieTableMetaClient)table.getMetaClient(), (boolean)false);
            DataType rowDataType = AvroSchemaConverter.convertToDataType((Schema)tableAvroSchema);
            RowType rowType = (RowType)rowDataType.getLogicalType();
            SingleOutputStreamOperator dataStream = env.addSource((SourceFunction)new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan, conf)).name("clustering_source").uid("uid_clustering_source").rebalance().transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), (OneInputStreamOperator)new ClusteringOperator(conf, rowType)).setParallelism(clusteringPlan.getInputGroups().size());
            ExecNodeUtil.setManagedMemoryWeight((Transformation)dataStream.getTransformation(), (long)((long)conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L));
            dataStream.addSink((SinkFunction)new ClusteringCommitSink(conf)).name("clustering_commit").uid("uid_clustering_commit").setParallelism(1);
            env.execute("flink_hudi_clustering");
            TestData.checkWrittenData(this.tempFile, EXPECTED, 4);
        }
    }

    @Test
    public void testHoodieFlinkClusteringService() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        FlinkClusteringConfig cfg = new FlinkClusteringConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        cfg.minClusteringIntervalSeconds = 3;
        cfg.schedule = true;
        Configuration conf = FlinkClusteringConfig.toFlinkConfig((FlinkClusteringConfig)cfg);
        HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf);
        asyncClusteringService.start(null);
        TimeUnit.SECONDS.sleep(5L);
        asyncClusteringService.shutDown();
        TestData.checkWrittenData(this.tempFile, EXPECTED, 4);
    }

    @Test
    public void testHoodieFlinkClusteringSchedule() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkClusteringConfig cfg = new FlinkClusteringConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        Configuration conf = FlinkClusteringConfig.toFlinkConfig((FlinkClusteringConfig)cfg);
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
        long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
        conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 2);
        conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false);
        conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);){
            String clusteringInstantTime = writeClient.createNewInstantTime();
            boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
            Assertions.assertFalse((boolean)scheduled, (String)"1 delta commit, the clustering plan should not be scheduled");
            tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
            TimeUnit.SECONDS.sleep(3L);
            clusteringInstantTime = writeClient.createNewInstantTime();
            scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
            Assertions.assertTrue((boolean)scheduled, (String)"2 delta commits, the clustering plan should be scheduled");
        }
    }

    @Test
    public void testHoodieFlinkClusteringScheduleAfterArchive() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "false");
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkClusteringConfig cfg = new FlinkClusteringConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        cfg.targetPartitions = 4;
        Configuration conf = FlinkClusteringConfig.toFlinkConfig((FlinkClusteringConfig)cfg);
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
        long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
        conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS.key(), 2);
        conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS.key(), 1);
        conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), 0);
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);){
            HoodieFlinkTable table = writeClient.getHoodieTable();
            String firstClusteringInstant = writeClient.createNewInstantTime();
            boolean scheduled = writeClient.scheduleClusteringAtInstant(firstClusteringInstant, Option.empty());
            Assertions.assertTrue((boolean)scheduled, (String)"The clustering plan should be scheduled");
            table.getMetaClient().reloadActiveTimeline();
            HoodieTimeline timeline = table.getActiveTimeline().filterPendingClusteringTimeline().filter(i -> i.getState() == HoodieInstant.State.REQUESTED);
            Option clusteringPlanOption = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)((HoodieInstant)timeline.lastInstant().get()));
            HoodieClusteringPlan clusteringPlan = (HoodieClusteringPlan)((Pair)clusteringPlanOption.get()).getRight();
            HoodieInstant instant = HoodieTestUtils.INSTANT_GENERATOR.getClusteringCommitRequestedInstant(firstClusteringInstant);
            table.getActiveTimeline().transitionClusterRequestedToInflight(instant, Option.empty());
            Schema tableAvroSchema = StreamerUtil.getTableAvroSchema((HoodieTableMetaClient)table.getMetaClient(), (boolean)false);
            DataType rowDataType = AvroSchemaConverter.convertToDataType((Schema)tableAvroSchema);
            RowType rowType = (RowType)rowDataType.getLogicalType();
            SingleOutputStreamOperator dataStream = env.addSource((SourceFunction)new ClusteringPlanSourceFunction(firstClusteringInstant, clusteringPlan, conf)).name("clustering_source").uid("uid_clustering_source").rebalance().transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), (OneInputStreamOperator)new ClusteringOperator(conf, rowType)).setParallelism(clusteringPlan.getInputGroups().size());
            ExecNodeUtil.setManagedMemoryWeight((Transformation)dataStream.getTransformation(), (long)((long)conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L));
            dataStream.addSink((SinkFunction)new DiscardingSink()).name("discarding-sink").uid("uid_discarding-sink").setParallelism(1);
            env.execute("flink_hudi_clustering");
            tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
            TimeUnit.SECONDS.sleep(3L);
            writeClient.archive();
            scheduled = writeClient.scheduleClusteringAtInstant(writeClient.createNewInstantTime(), Option.empty());
            Assertions.assertTrue((boolean)scheduled, (String)"The clustering plan should be scheduled");
            table.getMetaClient().reloadActiveTimeline();
            timeline = table.getActiveTimeline().filterPendingClusteringTimeline().filter(i -> i.getState() == HoodieInstant.State.REQUESTED);
            HoodieInstant secondClusteringInstant = (HoodieInstant)timeline.lastInstant().get();
            List inputFileGroups = ((HoodieClusteringPlan)((Pair)ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)secondClusteringInstant).get()).getRight()).getInputGroups();
            Assertions.assertFalse((boolean)inputFileGroups.stream().anyMatch(fg -> fg.getSlices().stream().anyMatch(s -> s.getDataFilePath().contains(firstClusteringInstant))));
        }
    }

    @Test
    public void testHoodieFlinkClusteringWithTimestampNanos() {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
        DataType dataType = (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"uuid", (DataType)DataTypes.VARCHAR((int)20)), DataTypes.FIELD((String)"name", (DataType)DataTypes.VARCHAR((int)10)), DataTypes.FIELD((String)"age", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"ts", (DataType)DataTypes.TIMESTAMP((int)9)), DataTypes.FIELD((String)"partition", (DataType)DataTypes.VARCHAR((int)10))}).notNull();
        RowType rowType = (RowType)dataType.getLogicalType();
        List<String> fields = rowType.getFields().stream().map(RowType.RowField::asSummaryString).collect(Collectors.toList());
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", fields, options, true, "uuid", "partition");
        TableResult tableResult = tableEnv.executeSql(hoodieTableDDL);
        String insertSql = "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001001','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001001','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001001','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001001','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001001','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001001','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001001','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001001','par4')";
        Assertions.assertThrows(ValidationException.class, () -> ITTestHoodieFlinkClustering.lambda$testHoodieFlinkClusteringWithTimestampNanos$5((TableEnvironment)tableEnv), (String)"Avro does not support TIMESTAMP type with precision: 9, it only support precisions <= 6.");
    }

    @Test
    public void testHoodieFlinkClusteringWithTimestampMicros() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        DataType dataType = (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"uuid", (DataType)DataTypes.VARCHAR((int)20)), DataTypes.FIELD((String)"name", (DataType)DataTypes.VARCHAR((int)10)), DataTypes.FIELD((String)"age", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"ts", (DataType)DataTypes.TIMESTAMP((int)6)), DataTypes.FIELD((String)"partition", (DataType)DataTypes.VARCHAR((int)10))}).notNull();
        RowType rowType = (RowType)dataType.getLogicalType();
        List<String> fields = rowType.getFields().stream().map(RowType.RowField::asSummaryString).collect(Collectors.toList());
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", fields, options, true, "uuid", "partition");
        tableEnv.executeSql(hoodieTableDDL);
        String insertSql = "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001','par4')";
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        this.runCluster(rowType);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1, id2,par1,id2,Stephen,33,2100001,par1]");
        expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2, id4,par2,id4,Fabian,31,4100001,par2]");
        expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3, id6,par3,id6,Emma,20,6100001,par3]");
        expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4, id8,par4,id8,Han,56,8100001,par4]");
        TestData.checkWrittenData(this.tempFile, expected, 4);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testInsertWithDifferentRecordKeyNullabilityAndClustering(boolean withPk) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        String pkConstraint = withPk ? ",  primary key (uuid) not enforced\n" : "";
        String tblWithoutPkDDL = "create table t1(\n  `uuid` VARCHAR(20)\n,  `name` VARCHAR(10)\n,  `age` INT\n,  `ts` TIMESTAMP(3)\n,  `partition` VARCHAR(10)\n" + pkConstraint + ")\nPARTITIONED BY (`partition`)\nwith (\n  'connector' = 'hudi',\n  'hoodie.datasource.write.recordkey.field' = 'uuid',\n  'path' = '" + this.tempFile.getAbsolutePath() + "'\n)";
        tableEnv.executeSql(tblWithoutPkDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        RowType rowType = (RowType)((DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"uuid", (DataType)((DataType)DataTypes.VARCHAR((int)20).notNull())), DataTypes.FIELD((String)"name", (DataType)DataTypes.VARCHAR((int)10)), DataTypes.FIELD((String)"age", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"ts", (DataType)DataTypes.TIMESTAMP((int)3)), DataTypes.FIELD((String)"partition", (DataType)DataTypes.VARCHAR((int)10))}).notNull()).getLogicalType();
        this.runCluster(rowType);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
        expected.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
        expected.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
        expected.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
        TestData.checkWrittenData(this.tempFile, expected, 4);
    }

    @Test
    public void testOfflineClusterFailoverAfterCommit() throws Exception {
        StreamTableEnvironment tableEnv = this.prepareEnvAndTable();
        FlinkClusteringConfig cfg = new FlinkClusteringConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        cfg.targetPartitions = 4;
        Configuration conf = FlinkClusteringConfig.toFlinkConfig((FlinkClusteringConfig)cfg);
        Assertions.assertDoesNotThrow(() -> this.runOfflineCluster((TableEnvironment)tableEnv, conf));
        Table result = tableEnv.sqlQuery("select count(*) from t1");
        Assertions.assertEquals((Object)16L, (Object)((Row)tableEnv.toDataStream(result, Row.class).executeAndCollect(1).get(0)).getField(0));
    }

    private void runCluster(RowType rowType) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkClusteringConfig cfg = new FlinkClusteringConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        cfg.targetPartitions = 4;
        Configuration conf = FlinkClusteringConfig.toFlinkConfig((FlinkClusteringConfig)cfg);
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
        long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
        conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);){
            HoodieFlinkTable table = writeClient.getHoodieTable();
            String clusteringInstantTime = writeClient.createNewInstantTime();
            boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
            Assertions.assertTrue((boolean)scheduled, (String)"The clustering plan should be scheduled");
            table.getMetaClient().reloadActiveTimeline();
            HoodieTimeline timeline = table.getActiveTimeline().filterPendingClusteringTimeline().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
            Option clusteringPlanOption = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)((HoodieInstant)timeline.lastInstant().get()));
            HoodieClusteringPlan clusteringPlan = (HoodieClusteringPlan)((Pair)clusteringPlanOption.get()).getRight();
            HoodieInstant instant2 = HoodieTestUtils.INSTANT_GENERATOR.getClusteringCommitRequestedInstant(clusteringInstantTime);
            table.getActiveTimeline().transitionClusterRequestedToInflight(instant2, Option.empty());
            SingleOutputStreamOperator dataStream = env.addSource((SourceFunction)new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan, conf)).name("clustering_source").uid("uid_clustering_source").rebalance().transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), (OneInputStreamOperator)new ClusteringOperator(conf, rowType)).setParallelism(clusteringPlan.getInputGroups().size());
            ExecNodeUtil.setManagedMemoryWeight((Transformation)dataStream.getTransformation(), (long)((long)conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L));
            dataStream.addSink((SinkFunction)new ClusteringCommitSink(conf)).name("clustering_commit").uid("uid_clustering_commit").setParallelism(1);
            env.execute("flink_hudi_clustering");
        }
    }

    private StreamTableEnvironment prepareEnvAndTable() {
        Configuration conf = new Configuration();
        conf.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        tEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC, (Object)true);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
        options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name());
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tEnv.executeSql(hoodieTableDDL);
        tEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')");
        return tEnv;
    }

    private void runOfflineCluster(TableEnvironment tableEnv, Configuration conf) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (Time)Time.milliseconds((long)1L)));
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
        long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
        conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);){
            HoodieFlinkTable table = writeClient.getHoodieTable();
            String clusteringInstantTime = writeClient.createNewInstantTime();
            boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
            Assertions.assertTrue((boolean)scheduled, (String)"The clustering plan should be scheduled");
            tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')");
            table.getMetaClient().reloadActiveTimeline();
            HoodieTimeline timeline = table.getActiveTimeline().filterPendingClusteringTimeline().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
            Option clusteringPlanOption = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)((HoodieInstant)timeline.lastInstant().get()));
            HoodieClusteringPlan clusteringPlan = (HoodieClusteringPlan)((Pair)clusteringPlanOption.get()).getRight();
            HoodieInstant instant2 = HoodieTestUtils.INSTANT_GENERATOR.getClusteringCommitRequestedInstant(clusteringInstantTime);
            table.getActiveTimeline().transitionClusterRequestedToInflight(instant2, Option.empty());
            Schema tableAvroSchema = StreamerUtil.getTableAvroSchema((HoodieTableMetaClient)table.getMetaClient(), (boolean)false);
            DataType rowDataType = AvroSchemaConverter.convertToDataType((Schema)tableAvroSchema);
            RowType rowType = (RowType)rowDataType.getLogicalType();
            SingleOutputStreamOperator dataStream = env.addSource((SourceFunction)new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan, conf)).name("clustering_source").uid("uid_clustering_source").rebalance().transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), (OneInputStreamOperator)new ClusteringOperator(conf, rowType)).setParallelism(clusteringPlan.getInputGroups().size());
            ExecNodeUtil.setManagedMemoryWeight((Transformation)dataStream.getTransformation(), (long)((long)conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L));
            dataStream.addSink((SinkFunction)new ClusteringCommitTestSink(conf)).name("clustering_commit").uid("uid_clustering_commit").setParallelism(1);
            env.execute("flink_hudi_clustering");
        }
    }

    private static /* synthetic */ void lambda$testHoodieFlinkClusteringWithTimestampNanos$5(TableEnvironment tableEnv) throws Throwable {
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001001','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001001','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001001','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001001','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001001','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001001','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001001','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001001','par4')");
    }

    static {
        EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
        EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
        EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
        EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
    }
}

