package org.apache.hudi.sink.cluster;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({FlinkMiniCluster.class})
/* loaded from: input_file:org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.class */
public class ITTestHoodieFlinkClustering {
    private static final Map<String, String> EXPECTED = new HashMap();

    @TempDir
    File tempFile;

    @Test
    public void testHoodieFlinkClustering() throws Exception {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1).await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkClusteringConfig flinkClusteringConfig = new FlinkClusteringConfig();
        flinkClusteringConfig.path = this.tempFile.getAbsolutePath();
        flinkClusteringConfig.targetPartitions = 4;
        flinkClusteringConfig.sortMemory = 256;
        Configuration flinkConfig = FlinkClusteringConfig.toFlinkConfig(flinkClusteringConfig);
        Assertions.assertEquals(256, flinkConfig.getInteger(FlinkOptions.WRITE_SORT_MEMORY));
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(flinkConfig);
        flinkConfig.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        flinkConfig.setString(FlinkOptions.RECORD_KEY_FIELD, createMetaClient.getTableConfig().getRecordKeyFieldProp());
        flinkConfig.setString(FlinkOptions.PARTITION_PATH_FIELD, createMetaClient.getTableConfig().getPartitionFieldProp());
        flinkConfig.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, executionEnvironment.getCheckpointConfig().getCheckpointTimeout());
        flinkConfig.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        CompactionUtil.setAvroSchema(flinkConfig, createMetaClient);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieFlinkWriteClient createWriteClient = FlinkWriteClients.createWriteClient(flinkConfig);
        Throwable th = null;
        try {
            try {
                HoodieFlinkTable hoodieTable = createWriteClient.getHoodieTable();
                Assertions.assertTrue(createWriteClient.scheduleClusteringAtInstant(createNewInstantTime, Option.empty()), "The clustering plan should be scheduled");
                hoodieTable.getMetaClient().reloadActiveTimeline();
                HoodieClusteringPlan hoodieClusteringPlan = (HoodieClusteringPlan) ((Pair) ClusteringUtils.getClusteringPlan(hoodieTable.getMetaClient(), (HoodieInstant) hoodieTable.getActiveTimeline().filterPendingReplaceTimeline().filter(hoodieInstant -> {
                    return hoodieInstant.getState() == HoodieInstant.State.REQUESTED;
                }).lastInstant().get()).get()).getRight();
                hoodieTable.getActiveTimeline().transitionReplaceRequestedToInflight(HoodieTimeline.getReplaceCommitRequestedInstant(createNewInstantTime), Option.empty());
                SingleOutputStreamOperator parallelism = executionEnvironment.addSource(new ClusteringPlanSourceFunction(createNewInstantTime, hoodieClusteringPlan, flinkConfig)).name("clustering_source").uid("uid_clustering_source").rebalance().transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), new ClusteringOperator(flinkConfig, AvroSchemaConverter.convertToDataType(StreamerUtil.getTableAvroSchema(hoodieTable.getMetaClient(), false)).getLogicalType())).setParallelism(hoodieClusteringPlan.getInputGroups().size());
                ExecNodeUtil.setManagedMemoryWeight(parallelism.getTransformation(), flinkConfig.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024 * 1024);
                parallelism.addSink(new ClusteringCommitSink(flinkConfig)).name("clustering_commit").uid("uid_clustering_commit").setParallelism(1);
                executionEnvironment.execute("flink_hudi_clustering");
                TestData.checkWrittenData(this.tempFile, EXPECTED, 4);
                if (createWriteClient != null) {
                    if (0 == 0) {
                        createWriteClient.close();
                        return;
                    }
                    try {
                        createWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriteClient != null) {
                if (th != null) {
                    try {
                        createWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testHoodieFlinkClusteringService() throws Exception {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1).await();
        TimeUnit.SECONDS.sleep(3L);
        FlinkClusteringConfig flinkClusteringConfig = new FlinkClusteringConfig();
        flinkClusteringConfig.path = this.tempFile.getAbsolutePath();
        flinkClusteringConfig.minClusteringIntervalSeconds = 3;
        flinkClusteringConfig.schedule = true;
        HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = new HoodieFlinkClusteringJob.AsyncClusteringService(flinkClusteringConfig, FlinkClusteringConfig.toFlinkConfig(flinkClusteringConfig));
        asyncClusteringService.start((Function) null);
        TimeUnit.SECONDS.sleep(5L);
        asyncClusteringService.shutDown();
        TestData.checkWrittenData(this.tempFile, EXPECTED, 4);
    }

    @Test
    public void testHoodieFlinkClusteringSchedule() throws Exception {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1).await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkClusteringConfig flinkClusteringConfig = new FlinkClusteringConfig();
        flinkClusteringConfig.path = this.tempFile.getAbsolutePath();
        Configuration flinkConfig = FlinkClusteringConfig.toFlinkConfig(flinkClusteringConfig);
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(flinkConfig);
        flinkConfig.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        flinkConfig.setString(FlinkOptions.RECORD_KEY_FIELD, createMetaClient.getTableConfig().getRecordKeyFieldProp());
        flinkConfig.setString(FlinkOptions.PARTITION_PATH_FIELD, createMetaClient.getTableConfig().getPartitionFieldProp());
        flinkConfig.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, executionEnvironment.getCheckpointConfig().getCheckpointTimeout());
        flinkConfig.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        flinkConfig.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 2);
        flinkConfig.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false);
        flinkConfig.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
        CompactionUtil.setAvroSchema(flinkConfig, createMetaClient);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieFlinkWriteClient createWriteClient = FlinkWriteClients.createWriteClient(flinkConfig);
        Throwable th = null;
        try {
            try {
                Assertions.assertFalse(createWriteClient.scheduleClusteringAtInstant(createNewInstantTime, Option.empty()), "1 delta commit, the clustering plan should not be scheduled");
                create.executeSql(TestSQL.INSERT_T1).await();
                TimeUnit.SECONDS.sleep(3L);
                Assertions.assertTrue(createWriteClient.scheduleClusteringAtInstant(HoodieActiveTimeline.createNewInstantTime(), Option.empty()), "2 delta commits, the clustering plan should be scheduled");
                if (createWriteClient != null) {
                    if (0 == 0) {
                        createWriteClient.close();
                        return;
                    }
                    try {
                        createWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriteClient != null) {
                if (th != null) {
                    try {
                        createWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testHoodieFlinkClusteringScheduleAfterArchive() throws Exception {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "false");
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1).await();
        create.executeSql(TestSQL.INSERT_T1).await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkClusteringConfig flinkClusteringConfig = new FlinkClusteringConfig();
        flinkClusteringConfig.path = this.tempFile.getAbsolutePath();
        flinkClusteringConfig.targetPartitions = 4;
        Configuration flinkConfig = FlinkClusteringConfig.toFlinkConfig(flinkClusteringConfig);
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(flinkConfig);
        flinkConfig.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        flinkConfig.setString(FlinkOptions.RECORD_KEY_FIELD, createMetaClient.getTableConfig().getRecordKeyFieldProp());
        flinkConfig.setString(FlinkOptions.PARTITION_PATH_FIELD, createMetaClient.getTableConfig().getPartitionFieldProp());
        flinkConfig.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, executionEnvironment.getCheckpointConfig().getCheckpointTimeout());
        flinkConfig.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        flinkConfig.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS.key(), 2);
        flinkConfig.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS.key(), 1);
        flinkConfig.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), 0);
        CompactionUtil.setAvroSchema(flinkConfig, createMetaClient);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieFlinkWriteClient createWriteClient = FlinkWriteClients.createWriteClient(flinkConfig);
        Throwable th = null;
        try {
            try {
                HoodieFlinkTable hoodieTable = createWriteClient.getHoodieTable();
                Assertions.assertTrue(createWriteClient.scheduleClusteringAtInstant(createNewInstantTime, Option.empty()), "The clustering plan should be scheduled");
                hoodieTable.getMetaClient().reloadActiveTimeline();
                HoodieClusteringPlan hoodieClusteringPlan = (HoodieClusteringPlan) ((Pair) ClusteringUtils.getClusteringPlan(hoodieTable.getMetaClient(), (HoodieInstant) hoodieTable.getActiveTimeline().filterPendingReplaceTimeline().filter(hoodieInstant -> {
                    return hoodieInstant.getState() == HoodieInstant.State.REQUESTED;
                }).lastInstant().get()).get()).getRight();
                hoodieTable.getActiveTimeline().transitionReplaceRequestedToInflight(HoodieTimeline.getReplaceCommitRequestedInstant(createNewInstantTime), Option.empty());
                SingleOutputStreamOperator parallelism = executionEnvironment.addSource(new ClusteringPlanSourceFunction(createNewInstantTime, hoodieClusteringPlan, flinkConfig)).name("clustering_source").uid("uid_clustering_source").rebalance().transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), new ClusteringOperator(flinkConfig, AvroSchemaConverter.convertToDataType(StreamerUtil.getTableAvroSchema(hoodieTable.getMetaClient(), false)).getLogicalType())).setParallelism(hoodieClusteringPlan.getInputGroups().size());
                ExecNodeUtil.setManagedMemoryWeight(parallelism.getTransformation(), flinkConfig.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024 * 1024);
                parallelism.addSink(new DiscardingSink()).name("discarding-sink").uid("uid_discarding-sink").setParallelism(1);
                executionEnvironment.execute("flink_hudi_clustering");
                create.executeSql(TestSQL.INSERT_T1).await();
                TimeUnit.SECONDS.sleep(3L);
                createWriteClient.archive();
                Assertions.assertTrue(createWriteClient.scheduleClusteringAtInstant(HoodieActiveTimeline.createNewInstantTime(), Option.empty()), "The clustering plan should be scheduled");
                hoodieTable.getMetaClient().reloadActiveTimeline();
                Assertions.assertFalse(((HoodieClusteringPlan) ((Pair) ClusteringUtils.getClusteringPlan(hoodieTable.getMetaClient(), (HoodieInstant) hoodieTable.getActiveTimeline().filterPendingReplaceTimeline().filter(hoodieInstant2 -> {
                    return hoodieInstant2.getState() == HoodieInstant.State.REQUESTED;
                }).lastInstant().get()).get()).getRight()).getInputGroups().stream().anyMatch(hoodieClusteringGroup -> {
                    return hoodieClusteringGroup.getSlices().stream().anyMatch(hoodieSliceInfo -> {
                        return hoodieSliceInfo.getDataFilePath().contains(createNewInstantTime);
                    });
                }));
                if (createWriteClient != null) {
                    if (0 == 0) {
                        createWriteClient.close();
                        return;
                    }
                    try {
                        createWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriteClient != null) {
                if (th != null) {
                    try {
                        createWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testHoodieFlinkClusteringWithTimestampNanos() {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        hashMap.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", (List) DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)), DataTypes.FIELD("name", DataTypes.VARCHAR(10)), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(9)), DataTypes.FIELD("partition", DataTypes.VARCHAR(10))}).notNull().getLogicalType().getFields().stream().map((v0) -> {
            return v0.asSummaryString();
        }).collect(Collectors.toList()), hashMap, true, "uuid", "partition"));
        Assertions.assertThrows(ValidationException.class, () -> {
            create.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001001','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001001','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001001','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001001','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001001','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001001','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001001','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001001','par4')");
        }, "Avro does not support TIMESTAMP type with precision: 9, it only support precisions <= 6.");
    }

    @Test
    public void testHoodieFlinkClusteringWithTimestampMicros() throws Exception {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        RowType logicalType = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)), DataTypes.FIELD("name", DataTypes.VARCHAR(10)), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)), DataTypes.FIELD("partition", DataTypes.VARCHAR(10))}).notNull().getLogicalType();
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", (List) logicalType.getFields().stream().map((v0) -> {
            return v0.asSummaryString();
        }).collect(Collectors.toList()), hashMap, true, "uuid", "partition"));
        create.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkClusteringConfig flinkClusteringConfig = new FlinkClusteringConfig();
        flinkClusteringConfig.path = this.tempFile.getAbsolutePath();
        flinkClusteringConfig.targetPartitions = 4;
        Configuration flinkConfig = FlinkClusteringConfig.toFlinkConfig(flinkClusteringConfig);
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(flinkConfig);
        flinkConfig.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        flinkConfig.setString(FlinkOptions.RECORD_KEY_FIELD, createMetaClient.getTableConfig().getRecordKeyFieldProp());
        flinkConfig.setString(FlinkOptions.PARTITION_PATH_FIELD, createMetaClient.getTableConfig().getPartitionFieldProp());
        flinkConfig.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, executionEnvironment.getCheckpointConfig().getCheckpointTimeout());
        flinkConfig.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        CompactionUtil.setAvroSchema(flinkConfig, createMetaClient);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieFlinkWriteClient createWriteClient = FlinkWriteClients.createWriteClient(flinkConfig);
        Throwable th = null;
        try {
            try {
                HoodieFlinkTable hoodieTable = createWriteClient.getHoodieTable();
                Assertions.assertTrue(createWriteClient.scheduleClusteringAtInstant(createNewInstantTime, Option.empty()), "The clustering plan should be scheduled");
                hoodieTable.getMetaClient().reloadActiveTimeline();
                HoodieClusteringPlan hoodieClusteringPlan = (HoodieClusteringPlan) ((Pair) ClusteringUtils.getClusteringPlan(hoodieTable.getMetaClient(), (HoodieInstant) hoodieTable.getActiveTimeline().filterPendingReplaceTimeline().filter(hoodieInstant -> {
                    return hoodieInstant.getState() == HoodieInstant.State.REQUESTED;
                }).lastInstant().get()).get()).getRight();
                hoodieTable.getActiveTimeline().transitionReplaceRequestedToInflight(HoodieTimeline.getReplaceCommitRequestedInstant(createNewInstantTime), Option.empty());
                SingleOutputStreamOperator parallelism = executionEnvironment.addSource(new ClusteringPlanSourceFunction(createNewInstantTime, hoodieClusteringPlan, flinkConfig)).name("clustering_source").uid("uid_clustering_source").rebalance().transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), new ClusteringOperator(flinkConfig, logicalType)).setParallelism(hoodieClusteringPlan.getInputGroups().size());
                ExecNodeUtil.setManagedMemoryWeight(parallelism.getTransformation(), flinkConfig.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024 * 1024);
                parallelism.addSink(new ClusteringCommitSink(flinkConfig)).name("clustering_commit").uid("uid_clustering_commit").setParallelism(1);
                executionEnvironment.execute("flink_hudi_clustering");
                HashMap hashMap2 = new HashMap();
                hashMap2.put("par1", "[id1,par1,id1,Danny,23,1100001,par1, id2,par1,id2,Stephen,33,2100001,par1]");
                hashMap2.put("par2", "[id3,par2,id3,Julian,53,3100001,par2, id4,par2,id4,Fabian,31,4100001,par2]");
                hashMap2.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3, id6,par3,id6,Emma,20,6100001,par3]");
                hashMap2.put("par4", "[id7,par4,id7,Bob,44,7100001,par4, id8,par4,id8,Han,56,8100001,par4]");
                TestData.checkWrittenData(this.tempFile, hashMap2, 4);
                if (createWriteClient != null) {
                    if (0 == 0) {
                        createWriteClient.close();
                        return;
                    }
                    try {
                        createWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriteClient != null) {
                if (th != null) {
                    try {
                        createWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testOfflineClusterFailoverAfterCommit() throws Exception {
        StreamTableEnvironment prepareEnvAndTable = prepareEnvAndTable();
        FlinkClusteringConfig flinkClusteringConfig = new FlinkClusteringConfig();
        flinkClusteringConfig.path = this.tempFile.getAbsolutePath();
        flinkClusteringConfig.targetPartitions = 4;
        Configuration flinkConfig = FlinkClusteringConfig.toFlinkConfig(flinkClusteringConfig);
        Assertions.assertDoesNotThrow(() -> {
            runOfflineCluster(prepareEnvAndTable, flinkConfig);
        });
        Assertions.assertEquals(16L, ((Row) prepareEnvAndTable.toDataStream(prepareEnvAndTable.sqlQuery("select count(*) from t1"), Row.class).executeAndCollect(1).get(0)).getField(0));
    }

    private StreamTableEnvironment prepareEnvAndTable() {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        StreamTableEnvironment create = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(configuration));
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        create.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC, true);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        hashMap.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
        hashMap.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name());
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1);
        return create;
    }

    private void runOfflineCluster(TableEnvironment tableEnvironment, Configuration configuration) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(1L)));
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(configuration);
        configuration.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        configuration.setString(FlinkOptions.RECORD_KEY_FIELD, createMetaClient.getTableConfig().getRecordKeyFieldProp());
        configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, createMetaClient.getTableConfig().getPartitionFieldProp());
        configuration.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, executionEnvironment.getCheckpointConfig().getCheckpointTimeout());
        configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        CompactionUtil.setAvroSchema(configuration, createMetaClient);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieFlinkWriteClient createWriteClient = FlinkWriteClients.createWriteClient(configuration);
        Throwable th = null;
        try {
            try {
                HoodieFlinkTable hoodieTable = createWriteClient.getHoodieTable();
                Assertions.assertTrue(createWriteClient.scheduleClusteringAtInstant(createNewInstantTime, Option.empty()), "The clustering plan should be scheduled");
                tableEnvironment.executeSql(TestSQL.INSERT_T1);
                hoodieTable.getMetaClient().reloadActiveTimeline();
                HoodieClusteringPlan hoodieClusteringPlan = (HoodieClusteringPlan) ((Pair) ClusteringUtils.getClusteringPlan(hoodieTable.getMetaClient(), (HoodieInstant) hoodieTable.getActiveTimeline().filterPendingReplaceTimeline().filter(hoodieInstant -> {
                    return hoodieInstant.getState() == HoodieInstant.State.REQUESTED;
                }).lastInstant().get()).get()).getRight();
                hoodieTable.getActiveTimeline().transitionReplaceRequestedToInflight(HoodieTimeline.getReplaceCommitRequestedInstant(createNewInstantTime), Option.empty());
                SingleOutputStreamOperator parallelism = executionEnvironment.addSource(new ClusteringPlanSourceFunction(createNewInstantTime, hoodieClusteringPlan, configuration)).name("clustering_source").uid("uid_clustering_source").rebalance().transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), new ClusteringOperator(configuration, AvroSchemaConverter.convertToDataType(StreamerUtil.getTableAvroSchema(hoodieTable.getMetaClient(), false)).getLogicalType())).setParallelism(hoodieClusteringPlan.getInputGroups().size());
                ExecNodeUtil.setManagedMemoryWeight(parallelism.getTransformation(), configuration.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024 * 1024);
                parallelism.addSink(new ClusteringCommitTestSink(configuration)).name("clustering_commit").uid("uid_clustering_commit").setParallelism(1);
                executionEnvironment.execute("flink_hudi_clustering");
                if (createWriteClient != null) {
                    if (0 == 0) {
                        createWriteClient.close();
                        return;
                    }
                    try {
                        createWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriteClient != null) {
                if (th != null) {
                    try {
                        createWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriteClient.close();
                }
            }
            throw th4;
        }
    }

    static {
        EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
        EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
        EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
        EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
    }
}
