package org.apache.flink.connectors.hive;

import java.time.Duration;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.assertj.core.api.Assertions;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.class */
public class HiveDynamicTableFactoryTest {
    private static TableEnvironment tableEnv;
    private static HiveCatalog hiveCatalog;

    @BeforeClass
    public static void setup() {
        tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
    }

    @Test
    public void testHiveStreamingSourceOptions() throws Exception {
        tableEnv.executeSql(String.format("create table table1 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true')", HiveOptions.STREAMING_SOURCE_ENABLE.key()));
        HiveTableSource tableSource = getTableSource("table1");
        Assertions.assertThat(tableSource).isNotInstanceOf(HiveLookupTableSource.class);
        HiveTableSource hiveTableSource = tableSource;
        Configuration configuration = new Configuration();
        Map options = hiveTableSource.catalogTable.getOptions();
        configuration.getClass();
        options.forEach(configuration::setString);
        Assertions.assertThat((Comparable) configuration.get(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER)).isEqualTo(HiveOptions.PartitionOrder.PARTITION_NAME);
        tableEnv.executeSql(String.format("create table table2 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'latest')", HiveOptions.STREAMING_SOURCE_ENABLE.key(), HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key()));
        Assertions.assertThat(getTableSource("table2")).isInstanceOf(HiveLookupTableSource.class);
        Assertions.assertThatThrownBy(() -> {
            tableEnv.executeSql("select * from table2");
        }).hasMessage("The only supported 'streaming-source.partition.include' is 'all' in hive table scan, but is 'latest'");
        tableEnv.executeSql(String.format("create table table3 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'partition-name')", HiveOptions.STREAMING_SOURCE_ENABLE.key(), HiveOptions.STREAMING_SOURCE_PARTITION_ORDER.key()));
        HiveTableSource tableSource2 = getTableSource("table3");
        Assertions.assertThat(tableSource2).isInstanceOf(HiveTableSource.class);
        HiveTableSource hiveTableSource2 = tableSource2;
        Configuration configuration2 = new Configuration();
        Map options2 = hiveTableSource2.catalogTable.getOptions();
        configuration2.getClass();
        options2.forEach(configuration2::setString);
        Assertions.assertThat((HiveOptions.PartitionOrder) configuration2.get(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER)).isEqualTo(HiveOptions.PartitionOrder.PARTITION_NAME);
        tableEnv.executeSql(String.format("create table table4 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'partition-time')", HiveOptions.STREAMING_SOURCE_ENABLE.key(), "streaming-source.consume-order"));
        HiveTableSource tableSource3 = getTableSource("table4");
        Assertions.assertThat(tableSource3).isInstanceOf(HiveTableSource.class);
        HiveTableSource hiveTableSource3 = tableSource3;
        Configuration configuration3 = new Configuration();
        Map options3 = hiveTableSource3.catalogTable.getOptions();
        configuration3.getClass();
        options3.forEach(configuration3::setString);
        Assertions.assertThat((HiveOptions.PartitionOrder) configuration3.get(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER)).isEqualTo(HiveOptions.PartitionOrder.PARTITION_TIME);
    }

    /* JADX WARN: Type inference failed for: r1v9, types: [int[], int[][]] */
    @Test
    public void testHiveLookupSourceOptions() throws Exception {
        tableEnv.executeSql(String.format("create table table5 (x int, y string, z int) tblproperties ('%s'='5min')", HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
        Assertions.assertThat(getTableSource("table5")).isInstanceOf(HiveLookupTableSource.class);
        tableEnv.executeSql(String.format("create table table6 (x int, y string, z int) tblproperties ('%s' = 'true', '%s' = 'latest')", HiveOptions.STREAMING_SOURCE_ENABLE.key(), HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key()));
        HiveLookupTableSource tableSource = getTableSource("table6");
        Assertions.assertThat(tableSource).isInstanceOf(HiveLookupTableSource.class);
        Assertions.assertThat(tableSource.getLookupFunction((int[][]) new int[]{new int[]{0}}).getReloadInterval()).isEqualTo(Duration.ofHours(1L));
        HiveLookupTableSource hiveLookupTableSource = tableSource;
        Configuration configuration = new Configuration();
        Map options = hiveLookupTableSource.catalogTable.getOptions();
        configuration.getClass();
        options.forEach(configuration::setString);
        Assertions.assertThat(HiveOptions.PartitionOrder.PARTITION_NAME).isEqualTo(configuration.get(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER));
        tableEnv.executeSql(String.format("create table table7 (x int, y string, z int) tblproperties ('%s' = 'true', '%s' = 'latest', '%s' = '120min', '%s' = 'partition-time',  '%s' = 'custom', '%s' = 'path.to..TimeExtractor')", HiveOptions.STREAMING_SOURCE_ENABLE.key(), HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key(), HiveOptions.STREAMING_SOURCE_PARTITION_ORDER.key(), FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_KIND.key(), FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_CLASS.key()));
        HiveLookupTableSource tableSource2 = getTableSource("table7");
        Assertions.assertThat(tableSource2).isInstanceOf(HiveLookupTableSource.class);
        HiveLookupTableSource hiveLookupTableSource2 = tableSource2;
        Configuration configuration2 = new Configuration();
        Map options2 = hiveLookupTableSource2.catalogTable.getOptions();
        configuration2.getClass();
        options2.forEach(configuration2::setString);
        Assertions.assertThat(HiveOptions.PartitionOrder.PARTITION_TIME).isEqualTo(configuration2.get(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER));
        Assertions.assertThat("custom").isEqualTo((String) configuration2.get(FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_KIND));
        Assertions.assertThat("path.to..TimeExtractor").isEqualTo((String) configuration2.get(FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_CLASS));
        tableEnv.executeSql(String.format("create table table8 (x int, y string, z int) tblproperties ('%s' = 'true', '%s' = 'latest', '%s' = '5min')", HiveOptions.STREAMING_SOURCE_ENABLE.key(), HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key()));
        HiveLookupTableSource tableSource3 = getTableSource("table8");
        Assertions.assertThat(tableSource3).isInstanceOf(HiveLookupTableSource.class);
        HiveLookupTableSource hiveLookupTableSource3 = tableSource3;
        Configuration configuration3 = new Configuration();
        Map options3 = hiveLookupTableSource3.catalogTable.getOptions();
        configuration3.getClass();
        options3.forEach(configuration3::setString);
        Assertions.assertThat(Duration.ofMinutes(5L)).isEqualTo(configuration3.get(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL));
    }

    @Test
    public void testInvalidOptions() {
        tableEnv.executeSql(String.format("create table table9 (x int, y string, z int) tblproperties ('%s' = 'true', '%s' = 'latest', '%s' = '120min', '%s' = '1970-00-01')", HiveOptions.STREAMING_SOURCE_ENABLE.key(), HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key(), HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET.key()));
        Assertions.assertThatThrownBy(() -> {
            getTableSource("table9");
        }).rootCause().hasMessage("The 'streaming-source.consume-start-offset' is not supported when set 'streaming-source.partition.include' to 'latest'");
    }

    @Test
    public void testJobConfWithCredentials() throws Exception {
        Text text = new Text("HDFS_DELEGATION_TOKEN");
        Text text2 = new Text("ha-hdfs:hadoop-namespace");
        Credentials credentials = new Credentials();
        credentials.addToken(text2, new Token(new byte[4], new byte[4], text, text2));
        UserGroupInformation.getCurrentUser().addCredentials(credentials);
        tableEnv.executeSql(String.format("create table table10 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string)", new Object[0]));
        Token token = getTableSource("table10").getJobConf().getCredentials().getToken(text2);
        Assertions.assertThat(token).isNotNull();
        Assertions.assertThat(token.getKind()).isEqualTo(text);
        Assertions.assertThat(token.getService()).isEqualTo(text2);
        Token token2 = getTableSink("table10").getJobConf().getCredentials().getToken(text2);
        Assertions.assertThat(token2).isNotNull();
        Assertions.assertThat(token2.getKind()).isEqualTo(text);
        Assertions.assertThat(token2.getService()).isEqualTo(text2);
    }

    private DynamicTableSource getTableSource(String str) throws Exception {
        TableEnvironmentInternal tableEnvironmentInternal = tableEnv;
        ObjectIdentifier of = ObjectIdentifier.of(hiveCatalog.getName(), "default", str);
        return FactoryUtil.createDynamicTableSource((DynamicTableSourceFactory) hiveCatalog.getFactory().orElseThrow(IllegalStateException::new), of, tableEnvironmentInternal.getCatalogManager().resolveCatalogTable(hiveCatalog.getTable(of.toObjectPath())), tableEnv.getConfig(), Thread.currentThread().getContextClassLoader(), false);
    }

    private DynamicTableSink getTableSink(String str) throws Exception {
        TableEnvironmentInternal tableEnvironmentInternal = tableEnv;
        ObjectIdentifier of = ObjectIdentifier.of(hiveCatalog.getName(), "default", str);
        return FactoryUtil.createDynamicTableSink((DynamicTableSinkFactory) hiveCatalog.getFactory().orElseThrow(IllegalStateException::new), of, tableEnvironmentInternal.getCatalogManager().resolveCatalogTable(hiveCatalog.getTable(of.toObjectPath())), tableEnv.getConfig(), Thread.currentThread().getContextClassLoader(), false);
    }
}
