/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.time.Duration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.HiveLookupTableSource;
import org.apache.flink.connectors.hive.HiveTableSink;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
import org.apache.flink.table.filesystem.FileSystemLookupFunction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class HiveDynamicTableFactoryTest {
    private static TableEnvironment tableEnv;
    private static HiveCatalog hiveCatalog;

    @BeforeClass
    public static void setup() {
        tableEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        tableEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
    }

    @Test
    public void testHiveStreamingSourceOptions() throws Exception {
        tableEnv.executeSql(String.format("create table table1 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key()));
        DynamicTableSource tableSource1 = this.getTableSource("table1");
        Assert.assertFalse((boolean)(tableSource1 instanceof HiveLookupTableSource));
        HiveTableSource tableSource = (HiveTableSource)tableSource1;
        Configuration configuration = new Configuration();
        tableSource.catalogTable.getOptions().forEach((arg_0, arg_1) -> ((Configuration)configuration).setString(arg_0, arg_1));
        Assert.assertEquals((Object)FileSystemConnectorOptions.PartitionOrder.PARTITION_NAME, (Object)configuration.get(FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER));
        tableEnv.executeSql(String.format("create table table2 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'latest')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key()));
        DynamicTableSource tableSource2 = this.getTableSource("table2");
        Assert.assertTrue((boolean)(tableSource2 instanceof HiveLookupTableSource));
        try {
            tableEnv.executeSql("select * from table2");
        }
        catch (Throwable t) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)t, (String)"The only supported 'streaming-source.partition.include' is 'all' in hive table scan, but is 'latest'").isPresent());
        }
        tableEnv.executeSql(String.format("create table table3 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'partition-name')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER.key()));
        DynamicTableSource tableSource3 = this.getTableSource("table3");
        Assert.assertTrue((boolean)(tableSource3 instanceof HiveTableSource));
        HiveTableSource hiveTableSource3 = (HiveTableSource)tableSource3;
        Configuration configuration1 = new Configuration();
        hiveTableSource3.catalogTable.getOptions().forEach((arg_0, arg_1) -> ((Configuration)configuration1).setString(arg_0, arg_1));
        FileSystemConnectorOptions.PartitionOrder partitionOrder1 = (FileSystemConnectorOptions.PartitionOrder)configuration1.get(FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER);
        Assert.assertEquals((Object)FileSystemConnectorOptions.PartitionOrder.PARTITION_NAME, (Object)partitionOrder1);
        tableEnv.executeSql(String.format("create table table4 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'partition-time')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), "streaming-source.consume-order"));
        DynamicTableSource tableSource4 = this.getTableSource("table4");
        Assert.assertTrue((boolean)(tableSource4 instanceof HiveTableSource));
        HiveTableSource hiveTableSource = (HiveTableSource)tableSource4;
        Configuration configuration2 = new Configuration();
        hiveTableSource.catalogTable.getOptions().forEach((arg_0, arg_1) -> ((Configuration)configuration2).setString(arg_0, arg_1));
        FileSystemConnectorOptions.PartitionOrder partitionOrder2 = (FileSystemConnectorOptions.PartitionOrder)configuration2.get(FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER);
        Assert.assertEquals((Object)FileSystemConnectorOptions.PartitionOrder.PARTITION_TIME, (Object)partitionOrder2);
    }

    @Test
    public void testHiveLookupSourceOptions() throws Exception {
        tableEnv.executeSql(String.format("create table table5 (x int, y string, z int) tblproperties ('%s'='5min')", FileSystemConnectorOptions.LOOKUP_JOIN_CACHE_TTL.key()));
        DynamicTableSource tableSource1 = this.getTableSource("table5");
        Assert.assertTrue((boolean)(tableSource1 instanceof HiveLookupTableSource));
        tableEnv.executeSql(String.format("create table table6 (x int, y string, z int) tblproperties ('%s' = 'true', '%s' = 'latest')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key()));
        DynamicTableSource tableSource2 = this.getTableSource("table6");
        Assert.assertTrue((boolean)(tableSource2 instanceof HiveLookupTableSource));
        FileSystemLookupFunction lookupFunction = (FileSystemLookupFunction)((HiveLookupTableSource)tableSource2).getLookupFunction((int[][])new int[][]{{0}});
        Assert.assertEquals((Object)Duration.ofHours(1L), (Object)lookupFunction.getReloadInterval());
        HiveLookupTableSource lookupTableSource = (HiveLookupTableSource)tableSource2;
        Configuration configuration = new Configuration();
        lookupTableSource.catalogTable.getOptions().forEach((arg_0, arg_1) -> ((Configuration)configuration).setString(arg_0, arg_1));
        Assert.assertEquals((Object)configuration.get(FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER), (Object)FileSystemConnectorOptions.PartitionOrder.PARTITION_NAME);
        tableEnv.executeSql(String.format("create table table7 (x int, y string, z int) tblproperties ('%s' = 'true', '%s' = 'latest', '%s' = '120min', '%s' = 'partition-time',  '%s' = 'custom', '%s' = 'path.to..TimeExtractor')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER.key(), FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_KIND.key(), FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_CLASS.key()));
        DynamicTableSource tableSource3 = this.getTableSource("table7");
        Assert.assertTrue((boolean)(tableSource3 instanceof HiveLookupTableSource));
        HiveLookupTableSource tableSource = (HiveLookupTableSource)tableSource3;
        Configuration configuration1 = new Configuration();
        tableSource.catalogTable.getOptions().forEach((arg_0, arg_1) -> ((Configuration)configuration1).setString(arg_0, arg_1));
        Assert.assertEquals((Object)configuration1.get(FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER), (Object)FileSystemConnectorOptions.PartitionOrder.PARTITION_TIME);
        Assert.assertEquals((Object)configuration1.get(FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_KIND), (Object)"custom");
        Assert.assertEquals((Object)configuration1.get(FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_CLASS), (Object)"path.to..TimeExtractor");
        tableEnv.executeSql(String.format("create table table8 (x int, y string, z int) tblproperties ('%s' = 'true', '%s' = 'latest', '%s' = '5min')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key()));
        DynamicTableSource tableSource4 = this.getTableSource("table8");
        Assert.assertTrue((boolean)(tableSource4 instanceof HiveLookupTableSource));
        HiveLookupTableSource lookupTableSource4 = (HiveLookupTableSource)tableSource4;
        Configuration configuration4 = new Configuration();
        lookupTableSource4.catalogTable.getOptions().forEach((arg_0, arg_1) -> ((Configuration)configuration4).setString(arg_0, arg_1));
        Assert.assertEquals((Object)configuration4.get(FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL), (Object)Duration.ofMinutes(5L));
    }

    @Test
    public void testInvalidOptions() throws Exception {
        tableEnv.executeSql(String.format("create table table9 (x int, y string, z int) tblproperties ('%s' = 'true', '%s' = 'latest', '%s' = '120min', '%s' = '1970-00-01')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key(), FileSystemConnectorOptions.STREAMING_SOURCE_CONSUME_START_OFFSET.key()));
        try {
            this.getTableSource("table9");
        }
        catch (Throwable t) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)t, (String)"The 'streaming-source.consume-start-offset' is not supported when set 'streaming-source.partition.include' to 'latest'").isPresent());
        }
    }

    @Test
    public void testJobConfWithCredentials() throws Exception {
        Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
        Text hdfsDelegationTokenService = new Text("ha-hdfs:hadoop-namespace");
        Credentials credentials = new Credentials();
        credentials.addToken(hdfsDelegationTokenService, new Token(new byte[4], new byte[4], hdfsDelegationTokenKind, hdfsDelegationTokenService));
        UserGroupInformation.getCurrentUser().addCredentials(credentials);
        tableEnv.executeSql(String.format("create table table10 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string)", new Object[0]));
        DynamicTableSource tableSource1 = this.getTableSource("table10");
        HiveTableSource tableSource = (HiveTableSource)tableSource1;
        Token token = tableSource.getJobConf().getCredentials().getToken(hdfsDelegationTokenService);
        Assert.assertNotNull((Object)token);
        Assert.assertEquals((Object)hdfsDelegationTokenKind, (Object)token.getKind());
        Assert.assertEquals((Object)hdfsDelegationTokenService, (Object)token.getService());
        DynamicTableSink tableSink1 = this.getTableSink("table10");
        HiveTableSink tableSink = (HiveTableSink)tableSink1;
        token = tableSink.getJobConf().getCredentials().getToken(hdfsDelegationTokenService);
        Assert.assertNotNull((Object)token);
        Assert.assertEquals((Object)hdfsDelegationTokenKind, (Object)token.getKind());
        Assert.assertEquals((Object)hdfsDelegationTokenService, (Object)token.getService());
    }

    private DynamicTableSource getTableSource(String tableName) throws Exception {
        TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal)tableEnv;
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)hiveCatalog.getName(), (String)"default", (String)tableName);
        CatalogTable catalogTable = (CatalogTable)hiveCatalog.getTable(tableIdentifier.toObjectPath());
        return FactoryUtil.createTableSource((Catalog)hiveCatalog, (ObjectIdentifier)tableIdentifier, (ResolvedCatalogTable)tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable), (ReadableConfig)tableEnv.getConfig().getConfiguration(), (ClassLoader)Thread.currentThread().getContextClassLoader(), (boolean)false);
    }

    private DynamicTableSink getTableSink(String tableName) throws Exception {
        TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal)tableEnv;
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)hiveCatalog.getName(), (String)"default", (String)tableName);
        CatalogTable catalogTable = (CatalogTable)hiveCatalog.getTable(tableIdentifier.toObjectPath());
        return FactoryUtil.createTableSink((Catalog)hiveCatalog, (ObjectIdentifier)tableIdentifier, (ResolvedCatalogTable)tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable), (ReadableConfig)tableEnv.getConfig().getConfiguration(), (ClassLoader)Thread.currentThread().getContextClassLoader(), (boolean)false);
    }
}

