/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.cluster;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class ITTestFlinkConsistentHashingClustering {
    private static final Map<String, String> EXPECTED_AFTER_INITIAL_INSERT = new HashMap<String, String>();
    private static final Map<String, String> EXPECTED_AFTER_UPSERT = new HashMap<String, String>();
    @TempDir
    File tempFile;

    @Test
    public void testScheduleSplitPlan() throws Exception {
        TableEnvironment tableEnv = this.setupTableEnv();
        this.prepareData(tableEnv);
        Configuration conf = this.getDefaultConfiguration();
        conf.setString(HoodieIndexConfig.BUCKET_INDEX_MIN_NUM_BUCKETS.key(), "4");
        conf.setString(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS.key(), "8");
        conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, (Object)1);
        conf.setString(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD.key(), String.valueOf(9.5367431640625E-7));
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);){
            Option clusteringInstantOption = writeClient.scheduleClustering(Option.empty());
            Assertions.assertTrue((boolean)clusteringInstantOption.isPresent());
            HoodieClusteringPlan clusteringPlan = this.getLatestClusteringPlan(writeClient);
            Assertions.assertEquals((int)4, (int)clusteringPlan.getInputGroups().size());
            Assertions.assertEquals((int)1, (int)((HoodieClusteringGroup)clusteringPlan.getInputGroups().get(0)).getSlices().size());
            Assertions.assertEquals((int)1, (int)((HoodieClusteringGroup)clusteringPlan.getInputGroups().get(1)).getSlices().size());
            Assertions.assertEquals((int)1, (int)((HoodieClusteringGroup)clusteringPlan.getInputGroups().get(2)).getSlices().size());
            Assertions.assertEquals((int)1, (int)((HoodieClusteringGroup)clusteringPlan.getInputGroups().get(3)).getSlices().size());
        }
    }

    @Test
    public void testScheduleMergePlan() throws Exception {
        TableEnvironment tableEnv = this.setupTableEnv();
        this.prepareData(tableEnv);
        Configuration conf = this.getDefaultConfiguration();
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);){
            Option clusteringInstantOption = writeClient.scheduleClustering(Option.empty());
            Assertions.assertFalse((boolean)clusteringInstantOption.isPresent());
        }
    }

    private HoodieClusteringPlan getLatestClusteringPlan(HoodieFlinkWriteClient writeClient) {
        HoodieFlinkTable table = writeClient.getHoodieTable();
        table.getMetaClient().reloadActiveTimeline();
        Option clusteringPlanOption = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)((HoodieInstant)table.getMetaClient().getActiveTimeline().filterPendingClusteringTimeline().lastInstant().get()));
        return (HoodieClusteringPlan)((Pair)clusteringPlanOption.get()).getRight();
    }

    private void prepareData(TableEnvironment tableEnv) throws Exception {
        Map<String, String> options = this.getDefaultConsistentHashingOption();
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false, "");
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        TestData.checkWrittenData(this.tempFile, EXPECTED_AFTER_INITIAL_INSERT, 0);
    }

    private TableEnvironment setupTableEnv() {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        return tableEnv;
    }

    private Configuration getDefaultConfiguration() throws Exception {
        FlinkClusteringConfig cfg = new FlinkClusteringConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        Configuration conf = FlinkClusteringConfig.toFlinkConfig((FlinkClusteringConfig)cfg);
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
        for (Map.Entry<String, String> e : this.getDefaultConsistentHashingOption().entrySet()) {
            conf.setString(e.getKey(), e.getValue());
        }
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        return conf;
    }

    private Map<String, String> getDefaultConsistentHashingOption() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        options.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.name());
        options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
        options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name());
        options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4");
        options.put(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.key(), FlinkConsistentBucketClusteringPlanStrategy.class.getName());
        options.put(HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key(), "org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy");
        options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        options.put(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED.key(), "false");
        return options;
    }

    static {
        EXPECTED_AFTER_INITIAL_INSERT.put("", "id1,,id1,Danny,23,1000,, id2,,id2,Stephen,33,2000,, id3,,id3,Julian,53,3000,, id4,,id4,Fabian,31,4000,, id5,,id5,Sophia,18,5000,, id6,,id6,Emma,20,6000,, id7,,id7,Bob,44,7000,, id8,,id8,Han,56,8000,, ]");
        EXPECTED_AFTER_UPSERT.put("", "[id1,,id1,Danny,24,1000,, id2,,id2,Stephen,34,2000,, id3,,id3,Julian,54,3000,, id4,,id4,Fabian,32,4000,, id5,,id5,Sophia,18,5000,, id6,,id6,Emma,20,6000,, id7,,id7,Bob,44,7000,, id8,,id8,Han,56,8000,, id9,,id9,Jane,19,6000,, id10,,id10,Ella,38,7000,, id11,,id11,Phoebe,52,8000,,]");
    }
}

