package org.apache.hudi.sink.compact;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.class */
public class ITTestHoodieFlinkCompactor {
    private static final Map<String, List<String>> EXPECTED = new HashMap();

    @TempDir
    File tempFile;

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHoodieFlinkCompactor(boolean z) throws Exception {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        hashMap.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), z + "");
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1).await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkCompactionConfig flinkCompactionConfig = new FlinkCompactionConfig();
        flinkCompactionConfig.path = this.tempFile.getAbsolutePath();
        Configuration flinkConfig = FlinkCompactionConfig.toFlinkConfig(flinkCompactionConfig);
        flinkConfig.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(flinkConfig);
        flinkConfig.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        CompactionUtil.setAvroSchema(flinkConfig, createMetaClient);
        CompactionUtil.inferChangelogMode(flinkConfig, createMetaClient);
        HoodieFlinkWriteClient createWriteClient = StreamerUtil.createWriteClient(flinkConfig);
        boolean z2 = false;
        Option compactionInstantTime = CompactionUtil.getCompactionInstantTime(createMetaClient);
        if (compactionInstantTime.isPresent()) {
            z2 = createWriteClient.scheduleCompactionAtInstant((String) compactionInstantTime.get(), Option.empty());
        }
        String str = (String) compactionInstantTime.get();
        Assertions.assertTrue(z2, "The compaction plan should be scheduled");
        HoodieFlinkTable hoodieTable = createWriteClient.getHoodieTable();
        HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(hoodieTable.getMetaClient(), str);
        hoodieTable.getActiveTimeline().transitionCompactionRequestedToInflight(HoodieTimeline.getCompactionRequestedInstant(str));
        executionEnvironment.addSource(new CompactionPlanSourceFunction(compactionPlan, str)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator(new CompactFunction(flinkConfig))).setParallelism(compactionPlan.getOperations().size()).addSink(new CompactionCommitSink(flinkConfig)).name("clean_commits").uid("uid_clean_commits").setParallelism(1);
        executionEnvironment.execute("flink_hudi_compaction");
        createWriteClient.close();
        TestData.checkWrittenFullData(this.tempFile, EXPECTED);
    }

    static {
        EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
        EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
        EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
        EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
    }
}
