/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
import org.apache.hudi.table.HoodieTableFactory;
import org.apache.hudi.table.HoodieTableSink;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.SchemaBuilder;
import org.apache.hudi.utils.TestConfigurations;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestHoodieTableFactory {
    private static final String AVRO_SCHEMA_FILE_PATH = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString();
    private static final String INFERRED_SCHEMA = "{\"type\":\"record\",\"name\":\"t1_record\",\"namespace\":\"hoodie.t1\",\"fields\":[{\"name\":\"uuid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
    private Configuration conf;
    @TempDir
    File tempFile;

    @BeforeEach
    void beforeEach() throws IOException {
        this.conf = new Configuration();
        this.conf.setString(FlinkOptions.PATH, this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_NAME, "t1");
        StreamerUtil.initTableIfNotExists((Configuration)this.conf);
    }

    @Test
    void testRequiredOptions() {
        ResolvedSchema schema1 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).build();
        MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext1));
        Assertions.assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext1));
        this.conf.set(FlinkOptions.OPERATION, (Object)"insert");
        MockContext sourceContext11 = MockContext.getInstance(this.conf, schema1, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext11));
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext11));
        HoodieTableSink tableSink11 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext11);
        MatcherAssert.assertThat((Object)tableSink11.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), (Matcher)CoreMatchers.is((Object)"no_precombine"));
        this.conf.set(FlinkOptions.OPERATION, FlinkOptions.OPERATION.defaultValue());
        ResolvedSchema schema2 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).build();
        this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "non_exist_field");
        MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext2));
        Assertions.assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext2));
        this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, (String)FlinkOptions.PRECOMBINE_FIELD.defaultValue());
        ResolvedSchema schema3 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        MockContext sourceContext3 = MockContext.getInstance(this.conf, schema3, "f2");
        HoodieTableSource tableSource = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext3);
        HoodieTableSink tableSink = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext3);
        MatcherAssert.assertThat((Object)tableSink.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), (Matcher)CoreMatchers.is((Object)"no_precombine"));
        MatcherAssert.assertThat((Object)tableSource.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), (Matcher)CoreMatchers.is((Object)FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));
        MatcherAssert.assertThat((Object)tableSink.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), (Matcher)CoreMatchers.is((Object)FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));
        this.conf.set(FlinkOptions.OPERATION, (Object)"insert");
        HoodieTableSink tableSink3 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext3);
        MatcherAssert.assertThat((Object)tableSink3.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), (Matcher)CoreMatchers.is((Object)"no_precombine"));
        this.conf.set(FlinkOptions.OPERATION, FlinkOptions.OPERATION.defaultValue());
        this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName());
        MockContext sourceContext4 = MockContext.getInstance(this.conf, schema3, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext4));
        Assertions.assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext4));
        this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, (String)FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue());
        ResolvedSchema schema4 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        MockContext sourceContext5 = MockContext.getInstance(this.conf, schema4, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext5));
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext5));
        HoodieTableSource tableSource5 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext5);
        HoodieTableSink tableSink5 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext5);
        MatcherAssert.assertThat((Object)tableSource5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), (Matcher)CoreMatchers.is((Object)EventTimeAvroPayload.class.getName()));
        MatcherAssert.assertThat((Object)tableSink5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), (Matcher)CoreMatchers.is((Object)EventTimeAvroPayload.class.getName()));
        ResolvedSchema schema5 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "no_precombine");
        MockContext sourceContext6 = MockContext.getInstance(this.conf, schema5, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext6));
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext6));
    }

    @Test
    void testIndexTypeCheck() {
        ResolvedSchema schema = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        MockContext sourceContext1 = MockContext.getInstance(this.conf, schema, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext1));
        this.conf.set(FlinkOptions.INDEX_TYPE, (Object)"BUCKET_AA");
        MockContext sourceContext2 = MockContext.getInstance(this.conf, schema, "f2");
        Assertions.assertThrows(IllegalArgumentException.class, () -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext2));
        this.conf.set(FlinkOptions.INDEX_TYPE, (Object)"BUCKET");
        MockContext sourceContext3 = MockContext.getInstance(this.conf, schema, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext3));
    }

    @Test
    void testTableTypeCheck() {
        ResolvedSchema schema = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        MockContext sourceContext1 = MockContext.getInstance(this.conf, schema, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext1));
        this.conf.setString(FlinkOptions.PATH, this.tempFile.getAbsolutePath() + "_NOT_EXIST_TABLE_PATH");
        this.conf.set(FlinkOptions.TABLE_TYPE, (Object)"INVALID_TABLE_TYPE");
        MockContext sourceContext2 = MockContext.getInstance(this.conf, schema, "f2");
        Assertions.assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext2));
        this.conf.setString(FlinkOptions.PATH, this.tempFile.getAbsolutePath());
        this.conf.set(FlinkOptions.TABLE_TYPE, (Object)"INVALID_TABLE_TYPE");
        MockContext sourceContext3 = MockContext.getInstance(this.conf, schema, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext3));
        this.conf.set(FlinkOptions.TABLE_TYPE, (Object)"MERGE_ON_READ");
        MockContext sourceContext4 = MockContext.getInstance(this.conf, schema, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext4));
        HoodieTableSink hoodieTableSink = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext4);
        MatcherAssert.assertThat((Object)hoodieTableSink.getConf().get(FlinkOptions.TABLE_TYPE), (Matcher)CoreMatchers.is((Object)"COPY_ON_WRITE"));
        this.conf.set(FlinkOptions.TABLE_TYPE, (Object)"COPY_ON_WRITE");
        MockContext sourceContext5 = MockContext.getInstance(this.conf, schema, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext5));
    }

    @Test
    void testSupplementTableConfig() throws Exception {
        String tablePath = new File(this.tempFile.getAbsolutePath(), "dummy").getAbsolutePath();
        Configuration tableConf = new Configuration();
        tableConf.setString(FlinkOptions.PATH, tablePath);
        tableConf.setString(FlinkOptions.TABLE_NAME, "t2");
        tableConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0,f1");
        tableConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f2");
        tableConf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        tableConf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, "my_payload");
        tableConf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        StreamerUtil.initTableIfNotExists((Configuration)tableConf);
        Configuration writeConf = new Configuration();
        writeConf.set(FlinkOptions.PATH, (Object)tablePath);
        writeConf.set(FlinkOptions.TABLE_NAME, (Object)"t2");
        ResolvedSchema schema1 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).build();
        MockContext sourceContext1 = MockContext.getInstance(writeConf, schema1, "f2");
        HoodieTableSource source1 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext1);
        HoodieTableSink sink1 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext1);
        MatcherAssert.assertThat((String)"pk not provided, fallback to table config", (Object)source1.getConf().get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0,f1"));
        MatcherAssert.assertThat((String)"pk not provided, fallback to table config", (Object)sink1.getConf().get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0,f1"));
        MatcherAssert.assertThat((String)"pre-combine key not provided, fallback to table config", (Object)source1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), (Matcher)CoreMatchers.is((Object)"f2"));
        MatcherAssert.assertThat((String)"pre-combine key not provided, fallback to table config", (Object)sink1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), (Matcher)CoreMatchers.is((Object)"f2"));
        MatcherAssert.assertThat((String)"table type not provided, fallback to table config", (Object)source1.getConf().get(FlinkOptions.TABLE_TYPE), (Matcher)CoreMatchers.is((Object)FlinkOptions.TABLE_TYPE_MERGE_ON_READ));
        MatcherAssert.assertThat((String)"table type not provided, fallback to table config", (Object)sink1.getConf().get(FlinkOptions.TABLE_TYPE), (Matcher)CoreMatchers.is((Object)FlinkOptions.TABLE_TYPE_MERGE_ON_READ));
        MatcherAssert.assertThat((String)"payload class not provided, fallback to table config", (Object)source1.getConf().get(FlinkOptions.PAYLOAD_CLASS_NAME), (Matcher)CoreMatchers.is((Object)"my_payload"));
        MatcherAssert.assertThat((String)"payload class not provided, fallback to table config", (Object)sink1.getConf().get(FlinkOptions.PAYLOAD_CLASS_NAME), (Matcher)CoreMatchers.is((Object)"my_payload"));
        writeConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0");
        writeConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f1");
        MockContext sourceContext2 = MockContext.getInstance(writeConf, schema1, "f2");
        HoodieTableSource source2 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext2);
        HoodieTableSink sink2 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext2);
        MatcherAssert.assertThat((String)"choose pk from write config", (Object)source2.getConf().get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0"));
        MatcherAssert.assertThat((String)"choose pk from write config", (Object)sink2.getConf().get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0"));
        MatcherAssert.assertThat((String)"choose preCombine key from write config", (Object)source2.getConf().get(FlinkOptions.PRECOMBINE_FIELD), (Matcher)CoreMatchers.is((Object)"f1"));
        MatcherAssert.assertThat((String)"choose preCombine pk from write config", (Object)sink2.getConf().get(FlinkOptions.PRECOMBINE_FIELD), (Matcher)CoreMatchers.is((Object)"f1"));
        writeConf.removeConfig(FlinkOptions.RECORD_KEY_FIELD);
        writeConf.removeConfig(FlinkOptions.PRECOMBINE_FIELD);
        ResolvedSchema schema2 = SchemaBuilder.instance().field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).build();
        MockContext sourceContext3 = MockContext.getInstance(writeConf, schema2, "f2");
        Assertions.assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext3), (String)"createDynamicTableSource won't call sanity check");
        Assertions.assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sourceContext3), (String)"f0 is in table config as record key, but missing in input schema");
    }

    @Test
    void testInferAvroSchemaForSource() {
        HoodieTableSource tableSource1 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)MockContext.getInstance(this.conf));
        Configuration conf1 = tableSource1.getConf();
        MatcherAssert.assertThat((Object)conf1.get(FlinkOptions.SOURCE_AVRO_SCHEMA), (Matcher)CoreMatchers.is((Object)INFERRED_SCHEMA));
        this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
        HoodieTableSource tableSource2 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)MockContext.getInstance(this.conf));
        Configuration conf2 = tableSource2.getConf();
        Assertions.assertNull((Object)conf2.get(FlinkOptions.SOURCE_AVRO_SCHEMA), (String)"expect schema string as null");
        this.conf.removeConfig(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH);
        ResolvedSchema schema3 = SchemaBuilder.instance().field("f_decimal", (DataType)DataTypes.DECIMAL((int)3, (int)2).notNull()).field("f_map", DataTypes.MAP((DataType)DataTypes.VARCHAR((int)20), (DataType)DataTypes.VARCHAR((int)10))).field("f_array", DataTypes.ARRAY((DataType)DataTypes.VARCHAR((int)10))).field("f_record", DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"r1", (DataType)DataTypes.VARCHAR((int)10)), DataTypes.FIELD((String)"r2", (DataType)DataTypes.INT())})).primaryKey("f_decimal").build();
        HoodieTableSink tableSink3 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)MockContext.getInstance(this.conf, schema3, ""));
        Configuration conf3 = tableSink3.getConf();
        String expected = AvroSchemaConverter.convertToSchema((LogicalType)schema3.toSourceRowDataType().getLogicalType(), (String)AvroSchemaUtils.getAvroRecordQualifiedName((String)"t1")).toString();
        MatcherAssert.assertThat((Object)conf3.get(FlinkOptions.SOURCE_AVRO_SCHEMA), (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testSetupHoodieKeyOptionsForSource() {
        this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
        this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, "dummyKeyGenClass");
        ResolvedSchema schema1 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.BIGINT()).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSource tableSource1 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext1);
        Configuration conf1 = tableSource1.getConf();
        MatcherAssert.assertThat((Object)conf1.get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0"));
        MatcherAssert.assertThat((Object)conf1.get(FlinkOptions.KEYGEN_CLASS_NAME), (Matcher)CoreMatchers.is((Object)"dummyKeyGenClass"));
        this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
        ResolvedSchema schema2 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", (DataType)DataTypes.VARCHAR((int)20).notNull()).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0", "f1").build();
        MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2");
        HoodieTableSource tableSource2 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext2);
        Configuration conf2 = tableSource2.getConf();
        MatcherAssert.assertThat((Object)conf2.get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0,f1"));
        MatcherAssert.assertThat((Object)conf2.get(FlinkOptions.KEYGEN_CLASS_NAME), (Matcher)CoreMatchers.is((Object)ComplexAvroKeyGenerator.class.getName()));
        this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
        MockContext sourceContext3 = MockContext.getInstance(this.conf, schema2, "");
        HoodieTableSource tableSource3 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext3);
        Configuration conf3 = tableSource3.getConf();
        MatcherAssert.assertThat((Object)conf3.get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0,f1"));
        MatcherAssert.assertThat((Object)conf3.get(FlinkOptions.KEYGEN_CLASS_NAME), (Matcher)CoreMatchers.is((Object)NonpartitionedAvroKeyGenerator.class.getName()));
    }

    @Test
    void testSetupHiveOptionsForSource() {
        ResolvedSchema schema1 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSource tableSource1 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext1);
        Configuration conf1 = tableSource1.getConf();
        MatcherAssert.assertThat((Object)conf1.getString(FlinkOptions.HIVE_SYNC_DB), (Matcher)CoreMatchers.is((Object)"db1"));
        MatcherAssert.assertThat((Object)conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), (Matcher)CoreMatchers.is((Object)"t1"));
        MatcherAssert.assertThat((Object)conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), (Matcher)CoreMatchers.is((Object)MultiPartKeysValueExtractor.class.getName()));
        this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
        this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
        this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
        MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSource tableSource2 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext2);
        Configuration conf2 = tableSource2.getConf();
        MatcherAssert.assertThat((Object)conf2.getString(FlinkOptions.HIVE_SYNC_DB), (Matcher)CoreMatchers.is((Object)"db2"));
        MatcherAssert.assertThat((Object)conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), (Matcher)CoreMatchers.is((Object)"t2"));
        MatcherAssert.assertThat((Object)conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), (Matcher)CoreMatchers.is((Object)MultiPartKeysValueExtractor.class.getName()));
    }

    @Test
    void testSetupCleaningOptionsForSource() {
        ResolvedSchema schema1 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "11");
        MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSource tableSource1 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext1);
        Configuration conf1 = tableSource1.getConf();
        MatcherAssert.assertThat((Object)conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), (Matcher)CoreMatchers.is((Object)FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue()));
        MatcherAssert.assertThat((Object)conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), (Matcher)CoreMatchers.is((Object)FlinkOptions.ARCHIVE_MAX_COMMITS.defaultValue()));
        int retainCommits = (Integer)FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue() + 5;
        this.conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), retainCommits);
        MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSource tableSource2 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext2);
        Configuration conf2 = tableSource2.getConf();
        MatcherAssert.assertThat((Object)conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), (Matcher)CoreMatchers.is((Object)(retainCommits + 10)));
        MatcherAssert.assertThat((Object)conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), (Matcher)CoreMatchers.is((Object)(retainCommits + 20)));
    }

    @Test
    void testSetupReadOptionsForSource() {
        ResolvedSchema schema1 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        this.conf.setString(FlinkOptions.READ_END_COMMIT, "123");
        MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSource tableSource1 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext1);
        Configuration conf1 = tableSource1.getConf();
        MatcherAssert.assertThat((Object)conf1.getString(FlinkOptions.QUERY_TYPE), (Matcher)CoreMatchers.is((Object)"incremental"));
        this.conf.removeConfig(FlinkOptions.READ_END_COMMIT);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "123");
        MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSource tableSource2 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext2);
        Configuration conf2 = tableSource2.getConf();
        MatcherAssert.assertThat((Object)conf2.getString(FlinkOptions.QUERY_TYPE), (Matcher)CoreMatchers.is((Object)"incremental"));
    }

    @Test
    void testBucketIndexOptionForSink() {
        ResolvedSchema schema1 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", (DataType)DataTypes.VARCHAR((int)20).notNull()).field("f2", DataTypes.TIMESTAMP((int)3)).primaryKey("f0", "f1").build();
        this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
        MockContext context = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSink tableSink = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)context);
        Configuration conf = tableSink.getConf();
        MatcherAssert.assertThat((Object)conf.getString(FlinkOptions.INDEX_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0,f1"));
        this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0");
        MockContext context2 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSink tableSink2 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)context2);
        Configuration conf2 = tableSink2.getConf();
        MatcherAssert.assertThat((Object)conf2.getString(FlinkOptions.INDEX_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0"));
        this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f1");
        MockContext context3 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSink tableSink3 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)context3);
        Configuration conf3 = tableSink3.getConf();
        MatcherAssert.assertThat((Object)conf3.getString(FlinkOptions.INDEX_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f1"));
        this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0,f1");
        MockContext context4 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSink tableSink4 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)context4);
        Configuration conf4 = tableSink4.getConf();
        MatcherAssert.assertThat((Object)conf4.getString(FlinkOptions.INDEX_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0,f1"));
        this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f2");
        MockContext context5 = MockContext.getInstance(this.conf, schema1, "f2");
        Assertions.assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)context5));
    }

    @Test
    void testInferAvroSchemaForSink() {
        HoodieTableSink tableSink1 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)MockContext.getInstance(this.conf));
        Configuration conf1 = tableSink1.getConf();
        MatcherAssert.assertThat((Object)conf1.get(FlinkOptions.SOURCE_AVRO_SCHEMA), (Matcher)CoreMatchers.is((Object)INFERRED_SCHEMA));
        this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
        HoodieTableSink tableSink2 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)MockContext.getInstance(this.conf));
        Configuration conf2 = tableSink2.getConf();
        Assertions.assertNull((Object)conf2.get(FlinkOptions.SOURCE_AVRO_SCHEMA), (String)"expect schema string as null");
        this.conf.removeConfig(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH);
        ResolvedSchema schema3 = SchemaBuilder.instance().field("f_decimal", (DataType)DataTypes.DECIMAL((int)3, (int)2).notNull()).field("f_map", DataTypes.MAP((DataType)DataTypes.VARCHAR((int)20), (DataType)DataTypes.VARCHAR((int)10))).field("f_array", DataTypes.ARRAY((DataType)DataTypes.VARCHAR((int)10))).field("f_record", DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"r1", (DataType)DataTypes.VARCHAR((int)10)), DataTypes.FIELD((String)"r2", (DataType)DataTypes.INT())})).primaryKey("f_decimal").build();
        HoodieTableSink tableSink3 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)MockContext.getInstance(this.conf, schema3, ""));
        Configuration conf3 = tableSink3.getConf();
        String expected = AvroSchemaConverter.convertToSchema((LogicalType)schema3.toSinkRowDataType().getLogicalType(), (String)AvroSchemaUtils.getAvroRecordQualifiedName((String)"t1")).toString();
        MatcherAssert.assertThat((Object)conf3.get(FlinkOptions.SOURCE_AVRO_SCHEMA), (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testSetupHoodieKeyOptionsForSink() {
        this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
        this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, "dummyKeyGenClass");
        ResolvedSchema schema1 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.BIGINT()).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSink tableSink1 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sinkContext1);
        Configuration conf1 = tableSink1.getConf();
        MatcherAssert.assertThat((Object)conf1.get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0"));
        MatcherAssert.assertThat((Object)conf1.get(FlinkOptions.KEYGEN_CLASS_NAME), (Matcher)CoreMatchers.is((Object)"dummyKeyGenClass"));
        this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
        ResolvedSchema schema2 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", (DataType)DataTypes.VARCHAR((int)20).notNull()).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0", "f1").build();
        MockContext sinkContext2 = MockContext.getInstance(this.conf, schema2, "f2");
        HoodieTableSink tableSink2 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sinkContext2);
        Configuration conf2 = tableSink2.getConf();
        MatcherAssert.assertThat((Object)conf2.get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0,f1"));
        MatcherAssert.assertThat((Object)conf2.get(FlinkOptions.KEYGEN_CLASS_NAME), (Matcher)CoreMatchers.is((Object)ComplexAvroKeyGenerator.class.getName()));
        this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
        MockContext sinkContext3 = MockContext.getInstance(this.conf, schema2, "");
        HoodieTableSink tableSink3 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sinkContext3);
        Configuration conf3 = tableSink3.getConf();
        MatcherAssert.assertThat((Object)conf3.get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0,f1"));
        MatcherAssert.assertThat((Object)conf3.get(FlinkOptions.KEYGEN_CLASS_NAME), (Matcher)CoreMatchers.is((Object)NonpartitionedAvroKeyGenerator.class.getName()));
        this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
        MockContext sinkContext4 = MockContext.getInstance(this.conf, schema2, "");
        HoodieTableSink tableSink4 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sinkContext4);
        Configuration conf4 = tableSink4.getConf();
        MatcherAssert.assertThat((Object)conf4.get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0,f1"));
        MatcherAssert.assertThat((Object)conf4.get(FlinkOptions.INDEX_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0,f1"));
        MatcherAssert.assertThat((Object)conf4.get(FlinkOptions.INDEX_TYPE), (Matcher)CoreMatchers.is((Object)HoodieIndex.IndexType.BUCKET.name()));
        MatcherAssert.assertThat((Object)conf4.get(FlinkOptions.KEYGEN_CLASS_NAME), (Matcher)CoreMatchers.is((Object)NonpartitionedAvroKeyGenerator.class.getName()));
    }

    @Test
    void testSetupHiveOptionsForSink() {
        ResolvedSchema schema1 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSink tableSink1 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sinkContext1);
        Configuration conf1 = tableSink1.getConf();
        MatcherAssert.assertThat((Object)conf1.getString(FlinkOptions.HIVE_SYNC_DB), (Matcher)CoreMatchers.is((Object)"db1"));
        MatcherAssert.assertThat((Object)conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), (Matcher)CoreMatchers.is((Object)"t1"));
        MatcherAssert.assertThat((Object)conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), (Matcher)CoreMatchers.is((Object)MultiPartKeysValueExtractor.class.getName()));
        this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
        this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
        this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
        MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSink tableSink2 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sinkContext2);
        Configuration conf2 = tableSink2.getConf();
        MatcherAssert.assertThat((Object)conf2.getString(FlinkOptions.HIVE_SYNC_DB), (Matcher)CoreMatchers.is((Object)"db2"));
        MatcherAssert.assertThat((Object)conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), (Matcher)CoreMatchers.is((Object)"t2"));
        MatcherAssert.assertThat((Object)conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), (Matcher)CoreMatchers.is((Object)MultiPartKeysValueExtractor.class.getName()));
    }

    @Test
    void testSetupCleaningOptionsForSink() {
        ResolvedSchema schema1 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "11");
        MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSink tableSink1 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sinkContext1);
        Configuration conf1 = tableSink1.getConf();
        MatcherAssert.assertThat((Object)conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), (Matcher)CoreMatchers.is((Object)FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue()));
        MatcherAssert.assertThat((Object)conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), (Matcher)CoreMatchers.is((Object)FlinkOptions.ARCHIVE_MAX_COMMITS.defaultValue()));
        int retainCommits = (Integer)FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue() + 5;
        this.conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), retainCommits);
        MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2");
        HoodieTableSink tableSink2 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)sinkContext2);
        Configuration conf2 = tableSink2.getConf();
        MatcherAssert.assertThat((Object)conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), (Matcher)CoreMatchers.is((Object)(retainCommits + 10)));
        MatcherAssert.assertThat((Object)conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), (Matcher)CoreMatchers.is((Object)(retainCommits + 20)));
    }

    @Test
    void testSetupTimestampBasedKeyGenForSink() {
        this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
        ResolvedSchema schema1 = SchemaBuilder.instance().field("f0", (DataType)DataTypes.INT().notNull()).field("f1", DataTypes.VARCHAR((int)20)).field("f2", DataTypes.TIMESTAMP((int)3)).field("ts", DataTypes.TIMESTAMP((int)3)).primaryKey("f0").build();
        MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "ts");
        HoodieTableSource tableSource1 = (HoodieTableSource)new HoodieTableFactory().createDynamicTableSource((DynamicTableFactory.Context)sourceContext1);
        Configuration conf1 = tableSource1.getConf();
        MatcherAssert.assertThat((Object)conf1.get(FlinkOptions.RECORD_KEY_FIELD), (Matcher)CoreMatchers.is((Object)"f0"));
        MatcherAssert.assertThat((Object)conf1.get(FlinkOptions.KEYGEN_CLASS_NAME), (Matcher)CoreMatchers.is((Object)TimestampBasedAvroKeyGenerator.class.getName()));
        MatcherAssert.assertThat((Object)conf1.getString(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key(), "dummy"), (Matcher)CoreMatchers.is((Object)"EPOCHMILLISECONDS"));
        MatcherAssert.assertThat((Object)conf1.getString(TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_DATE_FORMAT.key(), "dummy"), (Matcher)CoreMatchers.is((Object)"yyyyMMddHH"));
        MatcherAssert.assertThat((Object)conf1.getString(TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT.key(), "dummy"), (Matcher)CoreMatchers.is((Object)"UTC"));
    }

    @Test
    void testSetupWriteOptionsForSink() {
        HoodieTableSink tableSink1 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)MockContext.getInstance(this.conf));
        Configuration conf1 = tableSink1.getConf();
        MatcherAssert.assertThat((Object)conf1.get(FlinkOptions.PRE_COMBINE), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)conf1.get(FlinkOptions.TABLE_NAME), (Matcher)CoreMatchers.is((Object)"t1"));
        MatcherAssert.assertThat((Object)conf1.get(FlinkOptions.DATABASE_NAME), (Matcher)CoreMatchers.is((Object)"db1"));
        this.conf.setString(FlinkOptions.OPERATION, "insert");
        HoodieTableSink tableSink2 = (HoodieTableSink)new HoodieTableFactory().createDynamicTableSink((DynamicTableFactory.Context)MockContext.getInstance(this.conf));
        Configuration conf2 = tableSink2.getConf();
        MatcherAssert.assertThat((Object)conf2.get(FlinkOptions.PRE_COMBINE), (Matcher)CoreMatchers.is((Object)false));
    }

    private static class MockContext
    implements DynamicTableFactory.Context {
        private final Configuration conf;
        private final ResolvedSchema schema;
        private final List<String> partitions;

        private MockContext(Configuration conf, ResolvedSchema schema, List<String> partitions) {
            this.conf = conf;
            this.schema = schema;
            this.partitions = partitions;
        }

        static MockContext getInstance(Configuration conf) {
            return MockContext.getInstance(conf, TestConfigurations.TABLE_SCHEMA, Collections.singletonList("partition"));
        }

        static MockContext getInstance(Configuration conf, ResolvedSchema schema, String partition) {
            return MockContext.getInstance(conf, schema, Collections.singletonList(partition));
        }

        static MockContext getInstance(Configuration conf, ResolvedSchema schema, List<String> partitions) {
            return new MockContext(conf, schema, partitions);
        }

        public ObjectIdentifier getObjectIdentifier() {
            return ObjectIdentifier.of((String)"hudi", (String)"db1", (String)"t1");
        }

        public ResolvedCatalogTable getCatalogTable() {
            CatalogTable catalogTable = CatalogTable.of((Schema)Schema.newBuilder().fromResolvedSchema(this.schema).build(), (String)"mock source table", this.partitions, (Map)this.conf.toMap());
            return new ResolvedCatalogTable(catalogTable, this.schema);
        }

        public ReadableConfig getConfiguration() {
            return this.conf;
        }

        public ClassLoader getClassLoader() {
            return null;
        }

        public boolean isTemporary() {
            return false;
        }
    }
}

