package org.apache.hudi.table;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.SchemaBuilder;
import org.apache.hudi.utils.TestConfigurations;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/hudi/table/TestHoodieTableFactory.class */
public class TestHoodieTableFactory {
    private static final String AVRO_SCHEMA_FILE_PATH = ((URL) Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc"))).toString();
    private static final String INFERRED_SCHEMA = "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"uuid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
    private Configuration conf;

    @TempDir
    File tempFile;

    /* loaded from: input_file:org/apache/hudi/table/TestHoodieTableFactory$MockContext.class */
    private static class MockContext implements DynamicTableFactory.Context {
        private final Configuration conf;
        private final ResolvedSchema schema;
        private final List<String> partitions;

        private MockContext(Configuration configuration, ResolvedSchema resolvedSchema, List<String> list) {
            this.conf = configuration;
            this.schema = resolvedSchema;
            this.partitions = list;
        }

        static MockContext getInstance(Configuration configuration) {
            return getInstance(configuration, TestConfigurations.TABLE_SCHEMA, (List<String>) Collections.singletonList("partition"));
        }

        static MockContext getInstance(Configuration configuration, ResolvedSchema resolvedSchema, String str) {
            return getInstance(configuration, resolvedSchema, (List<String>) Collections.singletonList(str));
        }

        static MockContext getInstance(Configuration configuration, ResolvedSchema resolvedSchema, List<String> list) {
            return new MockContext(configuration, resolvedSchema, list);
        }

        public ObjectIdentifier getObjectIdentifier() {
            return ObjectIdentifier.of("hudi", "db1", "t1");
        }

        public ResolvedCatalogTable getCatalogTable() {
            return new ResolvedCatalogTable(CatalogTable.of(Schema.newBuilder().fromResolvedSchema(this.schema).build(), "mock source table", this.partitions, this.conf.toMap()), this.schema);
        }

        public ReadableConfig getConfiguration() {
            return this.conf;
        }

        public ClassLoader getClassLoader() {
            return null;
        }

        public boolean isTemporary() {
            return false;
        }
    }

    @BeforeEach
    void beforeEach() throws IOException {
        this.conf = new Configuration();
        this.conf.setString(FlinkOptions.PATH, this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_NAME, "t1");
        StreamerUtil.initTableIfNotExists(this.conf);
    }

    @Test
    void testRequiredOptionsForSource() {
        MockContext mockContext = MockContext.getInstance(this.conf, SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.TIMESTAMP(3)).build(), "f2");
        Assertions.assertThrows(HoodieValidationException.class, () -> {
            new HoodieTableFactory().createDynamicTableSource(mockContext);
        });
        Assertions.assertThrows(HoodieValidationException.class, () -> {
            new HoodieTableFactory().createDynamicTableSink(mockContext);
        });
        ResolvedSchema build = SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.TIMESTAMP(3)).build();
        this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "non_exist_field");
        MockContext mockContext2 = MockContext.getInstance(this.conf, build, "f2");
        Assertions.assertThrows(HoodieValidationException.class, () -> {
            new HoodieTableFactory().createDynamicTableSource(mockContext2);
        });
        Assertions.assertThrows(HoodieValidationException.class, () -> {
            new HoodieTableFactory().createDynamicTableSink(mockContext2);
        });
        this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, (String) FlinkOptions.PRECOMBINE_FIELD.defaultValue());
        ResolvedSchema build2 = SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.TIMESTAMP(3)).primaryKey("f0").build();
        MockContext mockContext3 = MockContext.getInstance(this.conf, build2, "f2");
        HoodieTableSource createDynamicTableSource = new HoodieTableFactory().createDynamicTableSource(mockContext3);
        HoodieTableSink createDynamicTableSink = new HoodieTableFactory().createDynamicTableSink(mockContext3);
        MatcherAssert.assertThat(createDynamicTableSource.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), CoreMatchers.is("no_precombine"));
        MatcherAssert.assertThat(createDynamicTableSink.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), CoreMatchers.is("no_precombine"));
        MatcherAssert.assertThat(createDynamicTableSource.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), CoreMatchers.is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));
        MatcherAssert.assertThat(createDynamicTableSink.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), CoreMatchers.is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));
        this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName());
        MockContext mockContext4 = MockContext.getInstance(this.conf, build2, "f2");
        Assertions.assertThrows(HoodieValidationException.class, () -> {
            new HoodieTableFactory().createDynamicTableSource(mockContext4);
        });
        Assertions.assertThrows(HoodieValidationException.class, () -> {
            new HoodieTableFactory().createDynamicTableSink(mockContext4);
        });
        this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, (String) FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue());
        MockContext mockContext5 = MockContext.getInstance(this.conf, SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.TIMESTAMP(3)).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0").build(), "f2");
        Assertions.assertDoesNotThrow(() -> {
            return new HoodieTableFactory().createDynamicTableSource(mockContext5);
        });
        Assertions.assertDoesNotThrow(() -> {
            return new HoodieTableFactory().createDynamicTableSink(mockContext5);
        });
        HoodieTableSource createDynamicTableSource2 = new HoodieTableFactory().createDynamicTableSource(mockContext5);
        HoodieTableSink createDynamicTableSink2 = new HoodieTableFactory().createDynamicTableSink(mockContext5);
        MatcherAssert.assertThat(createDynamicTableSource2.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), CoreMatchers.is(EventTimeAvroPayload.class.getName()));
        MatcherAssert.assertThat(createDynamicTableSink2.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), CoreMatchers.is(EventTimeAvroPayload.class.getName()));
        ResolvedSchema build3 = SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.TIMESTAMP(3)).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0").build();
        this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "no_precombine");
        MockContext mockContext6 = MockContext.getInstance(this.conf, build3, "f2");
        Assertions.assertDoesNotThrow(() -> {
            return new HoodieTableFactory().createDynamicTableSource(mockContext6);
        });
        Assertions.assertDoesNotThrow(() -> {
            return new HoodieTableFactory().createDynamicTableSink(mockContext6);
        });
    }

    @Test
    void testInferAvroSchemaForSource() {
        MatcherAssert.assertThat(new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf)).getConf().get(FlinkOptions.SOURCE_AVRO_SCHEMA), CoreMatchers.is(INFERRED_SCHEMA));
        this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
        Assertions.assertNull(new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf)).getConf().get(FlinkOptions.SOURCE_AVRO_SCHEMA), "expect schema string as null");
    }

    @Test
    void testSetupHoodieKeyOptionsForSource() {
        this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
        this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, "dummyKeyGenClass");
        Configuration conf = new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf, SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.BIGINT()).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0").build(), "f2")).getConf();
        MatcherAssert.assertThat(conf.get(FlinkOptions.RECORD_KEY_FIELD), CoreMatchers.is("f0"));
        MatcherAssert.assertThat(conf.get(FlinkOptions.KEYGEN_CLASS_NAME), CoreMatchers.is("dummyKeyGenClass"));
        this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
        ResolvedSchema build = SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", (DataType) DataTypes.VARCHAR(20).notNull()).field("f2", DataTypes.TIMESTAMP(3)).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0", "f1").build();
        Configuration conf2 = new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf, build, "f2")).getConf();
        MatcherAssert.assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), CoreMatchers.is("f0,f1"));
        MatcherAssert.assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS_NAME), CoreMatchers.is(ComplexAvroKeyGenerator.class.getName()));
        this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
        Configuration conf3 = new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf, build, "")).getConf();
        MatcherAssert.assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), CoreMatchers.is("f0,f1"));
        MatcherAssert.assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS_NAME), CoreMatchers.is(NonpartitionedAvroKeyGenerator.class.getName()));
    }

    @Test
    void testSetupHiveOptionsForSource() {
        ResolvedSchema build = SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.TIMESTAMP(3)).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0").build();
        Configuration conf = new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf, build, "f2")).getConf();
        MatcherAssert.assertThat(conf.getString(FlinkOptions.HIVE_SYNC_DB), CoreMatchers.is("db1"));
        MatcherAssert.assertThat(conf.getString(FlinkOptions.HIVE_SYNC_TABLE), CoreMatchers.is("t1"));
        MatcherAssert.assertThat(conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), CoreMatchers.is(MultiPartKeysValueExtractor.class.getName()));
        this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
        this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
        this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
        Configuration conf2 = new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf, build, "f2")).getConf();
        MatcherAssert.assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), CoreMatchers.is("db2"));
        MatcherAssert.assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), CoreMatchers.is("t2"));
        MatcherAssert.assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), CoreMatchers.is(MultiPartKeysValueExtractor.class.getName()));
    }

    @Test
    void testSetupCleaningOptionsForSource() {
        ResolvedSchema build = SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.TIMESTAMP(3)).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0").build();
        this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "11");
        Configuration conf = new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf, build, "f2")).getConf();
        MatcherAssert.assertThat(Integer.valueOf(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS)), CoreMatchers.is(FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue()));
        MatcherAssert.assertThat(Integer.valueOf(conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)), CoreMatchers.is(FlinkOptions.ARCHIVE_MAX_COMMITS.defaultValue()));
        int intValue = ((Integer) FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue()).intValue() + 5;
        this.conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), intValue);
        Configuration conf2 = new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf, build, "f2")).getConf();
        MatcherAssert.assertThat(Integer.valueOf(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS)), CoreMatchers.is(Integer.valueOf(intValue + 10)));
        MatcherAssert.assertThat(Integer.valueOf(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)), CoreMatchers.is(Integer.valueOf(intValue + 20)));
    }

    @Test
    void testSetupReadOptionsForSource() {
        ResolvedSchema build = SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.TIMESTAMP(3)).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0").build();
        this.conf.setString(FlinkOptions.READ_END_COMMIT, "123");
        MatcherAssert.assertThat(new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf, build, "f2")).getConf().getString(FlinkOptions.QUERY_TYPE), CoreMatchers.is("incremental"));
        this.conf.removeConfig(FlinkOptions.READ_END_COMMIT);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "123");
        MatcherAssert.assertThat(new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf, build, "f2")).getConf().getString(FlinkOptions.QUERY_TYPE), CoreMatchers.is("incremental"));
    }

    @Test
    void testBucketIndexOptionForSink() {
        ResolvedSchema build = SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", (DataType) DataTypes.VARCHAR(20).notNull()).field("f2", DataTypes.TIMESTAMP(3)).primaryKey("f0", "f1").build();
        this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
        MatcherAssert.assertThat(new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, build, "f2")).getConf().getString(FlinkOptions.INDEX_KEY_FIELD), CoreMatchers.is("f0,f1"));
        this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0");
        MatcherAssert.assertThat(new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, build, "f2")).getConf().getString(FlinkOptions.INDEX_KEY_FIELD), CoreMatchers.is("f0"));
        this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f1");
        MatcherAssert.assertThat(new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, build, "f2")).getConf().getString(FlinkOptions.INDEX_KEY_FIELD), CoreMatchers.is("f1"));
        this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0,f1");
        MatcherAssert.assertThat(new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, build, "f2")).getConf().getString(FlinkOptions.INDEX_KEY_FIELD), CoreMatchers.is("f0,f1"));
        this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f2");
        MockContext mockContext = MockContext.getInstance(this.conf, build, "f2");
        Assertions.assertThrows(HoodieValidationException.class, () -> {
            new HoodieTableFactory().createDynamicTableSource(mockContext);
        });
    }

    @Test
    void testInferAvroSchemaForSink() {
        MatcherAssert.assertThat(new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)).getConf().get(FlinkOptions.SOURCE_AVRO_SCHEMA), CoreMatchers.is(INFERRED_SCHEMA));
        this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
        Assertions.assertNull(new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)).getConf().get(FlinkOptions.SOURCE_AVRO_SCHEMA), "expect schema string as null");
    }

    @Test
    void testSetupHoodieKeyOptionsForSink() {
        this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
        this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, "dummyKeyGenClass");
        Configuration conf = new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.BIGINT()).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0").build(), "f2")).getConf();
        MatcherAssert.assertThat(conf.get(FlinkOptions.RECORD_KEY_FIELD), CoreMatchers.is("f0"));
        MatcherAssert.assertThat(conf.get(FlinkOptions.KEYGEN_CLASS_NAME), CoreMatchers.is("dummyKeyGenClass"));
        this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
        ResolvedSchema build = SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", (DataType) DataTypes.VARCHAR(20).notNull()).field("f2", DataTypes.TIMESTAMP(3)).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0", "f1").build();
        Configuration conf2 = new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, build, "f2")).getConf();
        MatcherAssert.assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), CoreMatchers.is("f0,f1"));
        MatcherAssert.assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS_NAME), CoreMatchers.is(ComplexAvroKeyGenerator.class.getName()));
        this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
        Configuration conf3 = new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, build, "")).getConf();
        MatcherAssert.assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), CoreMatchers.is("f0,f1"));
        MatcherAssert.assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS_NAME), CoreMatchers.is(NonpartitionedAvroKeyGenerator.class.getName()));
        this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
        Configuration conf4 = new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, build, "")).getConf();
        MatcherAssert.assertThat(conf4.get(FlinkOptions.RECORD_KEY_FIELD), CoreMatchers.is("f0,f1"));
        MatcherAssert.assertThat(conf4.get(FlinkOptions.INDEX_KEY_FIELD), CoreMatchers.is("f0,f1"));
        MatcherAssert.assertThat(conf4.get(FlinkOptions.INDEX_TYPE), CoreMatchers.is(HoodieIndex.IndexType.BUCKET.name()));
        MatcherAssert.assertThat(conf4.get(FlinkOptions.KEYGEN_CLASS_NAME), CoreMatchers.is(NonpartitionedAvroKeyGenerator.class.getName()));
    }

    @Test
    void testSetupHiveOptionsForSink() {
        ResolvedSchema build = SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.TIMESTAMP(3)).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0").build();
        Configuration conf = new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, build, "f2")).getConf();
        MatcherAssert.assertThat(conf.getString(FlinkOptions.HIVE_SYNC_DB), CoreMatchers.is("db1"));
        MatcherAssert.assertThat(conf.getString(FlinkOptions.HIVE_SYNC_TABLE), CoreMatchers.is("t1"));
        MatcherAssert.assertThat(conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), CoreMatchers.is(MultiPartKeysValueExtractor.class.getName()));
        this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
        this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
        this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
        Configuration conf2 = new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, build, "f2")).getConf();
        MatcherAssert.assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), CoreMatchers.is("db2"));
        MatcherAssert.assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), CoreMatchers.is("t2"));
        MatcherAssert.assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), CoreMatchers.is(MultiPartKeysValueExtractor.class.getName()));
    }

    @Test
    void testSetupCleaningOptionsForSink() {
        ResolvedSchema build = SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.TIMESTAMP(3)).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0").build();
        this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "11");
        Configuration conf = new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, build, "f2")).getConf();
        MatcherAssert.assertThat(Integer.valueOf(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS)), CoreMatchers.is(FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue()));
        MatcherAssert.assertThat(Integer.valueOf(conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)), CoreMatchers.is(FlinkOptions.ARCHIVE_MAX_COMMITS.defaultValue()));
        int intValue = ((Integer) FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue()).intValue() + 5;
        this.conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), intValue);
        Configuration conf2 = new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, build, "f2")).getConf();
        MatcherAssert.assertThat(Integer.valueOf(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS)), CoreMatchers.is(Integer.valueOf(intValue + 10)));
        MatcherAssert.assertThat(Integer.valueOf(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)), CoreMatchers.is(Integer.valueOf(intValue + 20)));
    }

    @Test
    void testSetupTimestampBasedKeyGenForSink() {
        this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
        Configuration conf = new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf, SchemaBuilder.instance().field("f0", (DataType) DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR(20)).field("f2", DataTypes.TIMESTAMP(3)).field("ts", DataTypes.TIMESTAMP(3)).primaryKey("f0").build(), "ts")).getConf();
        MatcherAssert.assertThat(conf.get(FlinkOptions.RECORD_KEY_FIELD), CoreMatchers.is("f0"));
        MatcherAssert.assertThat(conf.get(FlinkOptions.KEYGEN_CLASS_NAME), CoreMatchers.is(TimestampBasedAvroKeyGenerator.class.getName()));
        MatcherAssert.assertThat(conf.getString("hoodie.deltastreamer.keygen.timebased.timestamp.type", "dummy"), CoreMatchers.is("EPOCHMILLISECONDS"));
        MatcherAssert.assertThat(conf.getString("hoodie.deltastreamer.keygen.timebased.output.dateformat", "dummy"), CoreMatchers.is("yyyyMMddHH"));
        MatcherAssert.assertThat(conf.getString("hoodie.deltastreamer.keygen.timebased.output.timezone", "dummy"), CoreMatchers.is("UTC"));
    }

    @Test
    void testSetupWriteOptionsForSink() {
        MatcherAssert.assertThat(new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)).getConf().get(FlinkOptions.PRE_COMBINE), CoreMatchers.is(true));
        this.conf.setString(FlinkOptions.OPERATION, "insert");
        MatcherAssert.assertThat(new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)).getConf().get(FlinkOptions.PRE_COMBINE), CoreMatchers.is(false));
    }
}
