package org.apache.hudi.sink.bucket;

import java.io.File;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.JsonDeserializationFunction;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.source.ContinuousFileSource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({FlinkMiniCluster.class})
/* loaded from: input_file:org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.class */
public class ITTestConsistentBucketStreamWrite extends TestLogger {
    private static final Map<String, String> EXPECTED = new HashMap();

    @TempDir
    File tempFile;

    @Test
    public void testWriteMOR() throws Exception {
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        defaultConf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
        defaultConf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE, "CONSISTENT_HASHING");
        defaultConf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
        defaultConf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
        testWriteToHoodie(defaultConf, "mor_write", 1, EXPECTED);
    }

    @Test
    public void testWriteMORWithResizePlan() throws Exception {
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        defaultConf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
        defaultConf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE, "CONSISTENT_HASHING");
        defaultConf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
        defaultConf.setString(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS.key(), "8");
        defaultConf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
        defaultConf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
        defaultConf.setString(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key(), "1");
        defaultConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
        defaultConf.setString(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD.key(), String.valueOf(9.5367431640625E-7d));
        testWriteToHoodie(defaultConf, "mor_write", 1, EXPECTED);
    }

    @Test
    public void testBulkInsert() {
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        defaultConf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
        defaultConf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE, "CONSISTENT_HASHING");
        defaultConf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
        defaultConf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
        defaultConf.setString(FlinkOptions.OPERATION, "bulk_insert");
        Assertions.assertThrows(HoodieException.class, () -> {
            testWriteToHoodie(defaultConf, "bulk_insert", 1, EXPECTED);
        });
    }

    @Test
    public void testOverwrite() {
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        defaultConf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
        defaultConf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE, "CONSISTENT_HASHING");
        defaultConf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
        defaultConf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
        defaultConf.setString(FlinkOptions.OPERATION, "INSERT_OVERWRITE");
        Assertions.assertThrows(HoodieException.class, () -> {
            testWriteToHoodie(defaultConf, "overwrite", 1, EXPECTED);
        });
    }

    private void testWriteToHoodie(Configuration configuration, String str, int i, Map<String, String> map) throws Exception {
        SingleOutputStreamOperator parallelism;
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableObjectReuse();
        executionEnvironment.setParallelism(4);
        executionEnvironment.enableCheckpointing(4000L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        RowType logicalType = AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(configuration)).getLogicalType();
        String url = ((URL) Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_source.data"))).toString();
        if (configuration.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name())) {
            TextInputFormat textInputFormat = new TextInputFormat(new Path(url));
            textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter());
            BasicTypeInfo basicTypeInfo = BasicTypeInfo.STRING_TYPE_INFO;
            textInputFormat.setCharsetName("UTF-8");
            parallelism = executionEnvironment.readFile(textInputFormat, url, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000L, basicTypeInfo).map(JsonDeserializationFunction.getInstance(logicalType)).setParallelism(1);
        } else {
            parallelism = executionEnvironment.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(url), i)).name("continuous_file_source").setParallelism(1).map(JsonDeserializationFunction.getInstance(logicalType)).setParallelism(4);
        }
        OptionsInference.setupSinkTasks(configuration, executionEnvironment.getParallelism());
        DataStream bootstrap = Pipelines.bootstrap(configuration, logicalType, parallelism);
        if (OptionsResolver.isBulkInsertOperation(configuration)) {
            Pipelines.bulkInsert(configuration, logicalType, parallelism);
        } else {
            executionEnvironment.addOperator(Pipelines.hoodieStreamWrite(configuration, bootstrap).getTransformation());
        }
        JobClient executeAsync = executionEnvironment.executeAsync(str);
        if (executeAsync.getJobStatus().get() != JobStatus.FAILED) {
            try {
                TimeUnit.SECONDS.sleep(30L);
                executeAsync.cancel();
            } catch (Throwable th) {
            }
        }
        TestData.checkWrittenDataMOR(FSUtils.getFs(this.tempFile.getAbsolutePath(), new org.apache.hadoop.conf.Configuration()), this.tempFile, map, 4);
    }

    static {
        EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
        EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
        EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
        EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
    }
}
