package org.apache.hudi.sink.cluster;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/hudi/sink/cluster/ITTestFlinkConsistentHashingClustering.class */
public class ITTestFlinkConsistentHashingClustering {
    private static final Map<String, String> EXPECTED_AFTER_INITIAL_INSERT = new HashMap();
    private static final Map<String, String> EXPECTED_AFTER_UPSERT = new HashMap();

    @TempDir
    File tempFile;

    @Test
    public void testScheduleSplitPlan() throws Exception {
        prepareData(setupTableEnv());
        Configuration defaultConfiguration = getDefaultConfiguration();
        defaultConfiguration.setString(HoodieIndexConfig.BUCKET_INDEX_MIN_NUM_BUCKETS.key(), "4");
        defaultConfiguration.setString(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS.key(), "8");
        defaultConfiguration.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
        defaultConfiguration.setString(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD.key(), String.valueOf(9.5367431640625E-7d));
        HoodieFlinkWriteClient createWriteClient = FlinkWriteClients.createWriteClient(defaultConfiguration);
        Throwable th = null;
        try {
            try {
                Assertions.assertTrue(createWriteClient.scheduleClustering(Option.empty()).isPresent());
                HoodieClusteringPlan latestClusteringPlan = getLatestClusteringPlan(createWriteClient);
                Assertions.assertEquals(4, latestClusteringPlan.getInputGroups().size());
                Assertions.assertEquals(1, ((HoodieClusteringGroup) latestClusteringPlan.getInputGroups().get(0)).getSlices().size());
                Assertions.assertEquals(1, ((HoodieClusteringGroup) latestClusteringPlan.getInputGroups().get(1)).getSlices().size());
                Assertions.assertEquals(1, ((HoodieClusteringGroup) latestClusteringPlan.getInputGroups().get(2)).getSlices().size());
                Assertions.assertEquals(1, ((HoodieClusteringGroup) latestClusteringPlan.getInputGroups().get(3)).getSlices().size());
                if (createWriteClient != null) {
                    if (0 == 0) {
                        createWriteClient.close();
                        return;
                    }
                    try {
                        createWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriteClient != null) {
                if (th != null) {
                    try {
                        createWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testScheduleMergePlan() throws Exception {
        prepareData(setupTableEnv());
        HoodieFlinkWriteClient createWriteClient = FlinkWriteClients.createWriteClient(getDefaultConfiguration());
        Throwable th = null;
        try {
            try {
                Assertions.assertFalse(createWriteClient.scheduleClustering(Option.empty()).isPresent());
                if (createWriteClient != null) {
                    if (0 == 0) {
                        createWriteClient.close();
                        return;
                    }
                    try {
                        createWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriteClient != null) {
                if (th != null) {
                    try {
                        createWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriteClient.close();
                }
            }
            throw th4;
        }
    }

    private HoodieClusteringPlan getLatestClusteringPlan(HoodieFlinkWriteClient hoodieFlinkWriteClient) {
        HoodieFlinkTable hoodieTable = hoodieFlinkWriteClient.getHoodieTable();
        hoodieTable.getMetaClient().reloadActiveTimeline();
        return (HoodieClusteringPlan) ((Pair) ClusteringUtils.getClusteringPlan(hoodieTable.getMetaClient(), (HoodieInstant) hoodieTable.getMetaClient().getActiveTimeline().filterPendingReplaceTimeline().lastInstant().get()).get()).getRight();
    }

    private void prepareData(TableEnvironment tableEnvironment) throws Exception {
        tableEnvironment.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", getDefaultConsistentHashingOption(), false, ""));
        tableEnvironment.executeSql(TestSQL.INSERT_T1).await();
        TimeUnit.SECONDS.sleep(3L);
        TestData.checkWrittenData(this.tempFile, EXPECTED_AFTER_INITIAL_INSERT, 0);
    }

    private TableEnvironment setupTableEnv() {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        return create;
    }

    private Configuration getDefaultConfiguration() throws Exception {
        FlinkClusteringConfig flinkClusteringConfig = new FlinkClusteringConfig();
        flinkClusteringConfig.path = this.tempFile.getAbsolutePath();
        Configuration flinkConfig = FlinkClusteringConfig.toFlinkConfig(flinkClusteringConfig);
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(flinkConfig);
        flinkConfig.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        flinkConfig.setString(FlinkOptions.RECORD_KEY_FIELD, createMetaClient.getTableConfig().getRecordKeyFieldProp());
        flinkConfig.setString(FlinkOptions.PARTITION_PATH_FIELD, createMetaClient.getTableConfig().getPartitionFieldProp());
        for (Map.Entry<String, String> entry : getDefaultConsistentHashingOption().entrySet()) {
            flinkConfig.setString(entry.getKey(), entry.getValue());
        }
        CompactionUtil.setAvroSchema(flinkConfig, createMetaClient);
        return flinkConfig;
    }

    private Map<String, String> getDefaultConsistentHashingOption() {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        hashMap.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.name());
        hashMap.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
        hashMap.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name());
        hashMap.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4");
        hashMap.put(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.key(), FlinkConsistentBucketClusteringPlanStrategy.class.getName());
        hashMap.put(HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key(), "org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy");
        hashMap.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED.key(), "false");
        return hashMap;
    }

    static {
        EXPECTED_AFTER_INITIAL_INSERT.put("", "id1,,id1,Danny,23,1000,, id2,,id2,Stephen,33,2000,, id3,,id3,Julian,53,3000,, id4,,id4,Fabian,31,4000,, id5,,id5,Sophia,18,5000,, id6,,id6,Emma,20,6000,, id7,,id7,Bob,44,7000,, id8,,id8,Han,56,8000,, ]");
        EXPECTED_AFTER_UPSERT.put("", "[id1,,id1,Danny,24,1000,, id2,,id2,Stephen,34,2000,, id3,,id3,Julian,54,3000,, id4,,id4,Fabian,32,4000,, id5,,id5,Sophia,18,5000,, id6,,id6,Emma,20,6000,, id7,,id7,Bob,44,7000,, id8,,id8,Han,56,8000,, id9,,id9,Jane,19,6000,, id10,,id10,Ella,38,7000,, id11,,id11,Phoebe,52,8000,,]");
    }
}
