/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.HiveLookupTableSource;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
import org.apache.flink.table.filesystem.FileSystemLookupFunction;
import org.apache.flink.table.filesystem.PartitionFetcher;
import org.apache.flink.table.filesystem.PartitionReader;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class HiveLookupJoinITCase {
    private static TableEnvironment tableEnv;
    private static HiveCatalog hiveCatalog;

    @BeforeClass
    public static void setup() {
        tableEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        tableEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        TestCollectionTableFactory.initData(Arrays.asList(Row.of((Object[])new Object[]{1, "a"}), Row.of((Object[])new Object[]{1, "c"}), Row.of((Object[])new Object[]{2, "b"}), Row.of((Object[])new Object[]{2, "c"}), Row.of((Object[])new Object[]{3, "c"}), Row.of((Object[])new Object[]{4, "d"})));
        tableEnv.executeSql("create table default_catalog.default_database.probe (x int,y string, p as proctime()) with ('connector'='COLLECTION','is-bounded' = 'false')");
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql(String.format("create table bounded_table (x int, y string, z int) tblproperties ('%s'='5min')", FileSystemConnectorOptions.LOOKUP_JOIN_CACHE_TTL.key()));
        tableEnv.executeSql(String.format("create table bounded_partition_table (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s'='5min')", FileSystemConnectorOptions.LOOKUP_JOIN_CACHE_TTL.key()));
        tableEnv.executeSql(String.format("create table partition_table (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'latest', '%s' = 'partition-name', '%s'='2h')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER.key(), FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key()));
        tableEnv.executeSql(String.format("create table partition_table_1 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'latest', '%s'='120min')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key()));
        tableEnv.executeSql(String.format("create table partition_table_2 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'latest', '%s' = '12h', '%s' = 'partition-time',  '%s' = 'default', '%s' = '$pt_year-$pt_mon-$pt_day 00:00:00')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER.key(), FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_KIND.key(), FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()));
        tableEnv.executeSql(String.format("create table partition_table_3 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ( '%s' = 'true', '%s' = 'latest', '%s' = 'create-time')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER.key()));
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
    }

    @Test
    public void testLookupOptions() throws Exception {
        FileSystemLookupFunction<HiveTablePartition> lookupFunction1 = this.getLookupFunction("bounded_table");
        FileSystemLookupFunction<HiveTablePartition> lookupFunction2 = this.getLookupFunction("partition_table");
        lookupFunction1.open(null);
        lookupFunction2.open(null);
        Assert.assertEquals((Object)Duration.ofMinutes(5L), (Object)lookupFunction1.getReloadInterval());
        Assert.assertEquals((Object)Duration.ofMinutes(120L), (Object)lookupFunction2.getReloadInterval());
    }

    @Test
    public void testPartitionFetcherAndReader() throws Exception {
        RowData row;
        TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        batchEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        batchEnv.useCatalog(hiveCatalog.getName());
        batchEnv.executeSql("insert overwrite partition_table values (1,'a',08,2019,'08','01'),(1,'a',10,2020,'08','31'),(2,'a',21,2020,'08','31'),(2,'b',22,2020,'08','31'),(3,'c',33,2020,'09','31')").await();
        FileSystemLookupFunction<HiveTablePartition> lookupFunction = this.getLookupFunction("partition_table");
        lookupFunction.open(null);
        PartitionFetcher fetcher = lookupFunction.getPartitionFetcher();
        PartitionFetcher.Context context = lookupFunction.getFetcherContext();
        List partitions = fetcher.fetch(context);
        Assert.assertEquals((long)1L, (long)partitions.size());
        PartitionReader reader = lookupFunction.getPartitionReader();
        reader.open(partitions);
        ArrayList<Object> res = new ArrayList<Object>();
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)hiveCatalog.getName(), (String)"default", (String)"partition_table");
        CatalogTable catalogTable = (CatalogTable)hiveCatalog.getTable(tableIdentifier.toObjectPath());
        GenericRowData reuse = new GenericRowData(catalogTable.getSchema().getFieldCount());
        TypeSerializer serializer = InternalSerializers.create((LogicalType)catalogTable.getSchema().toRowDataType().getLogicalType());
        while ((row = (RowData)reader.read((Object)reuse)) != null) {
            res.add(serializer.copy((Object)row));
        }
        res.sort(Comparator.comparingInt(o -> o.getInt(0)));
        Assert.assertEquals((Object)"[+I(3,c,33,2020,09,31)]", (Object)((Object)res).toString());
    }

    @Test
    public void testLookupJoinBoundedTable() throws Exception {
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tableEnv.executeSql("insert into bounded_table values (1,'a',10),(2,'a',21),(2,'b',22),(3,'c',33)").await();
        TableImpl flinkTable = (TableImpl)tableEnv.sqlQuery("select p.x, p.y, b.z from  default_catalog.default_database.probe as p  join bounded_table for system_time as of p.p as b on p.x=b.x and p.y=b.y");
        List results = CollectionUtil.iteratorToList((Iterator)flinkTable.execute().collect());
        Assert.assertEquals((Object)"[+I[1, a, 10], +I[2, b, 22], +I[3, c, 33]]", (Object)results.toString());
    }

    @Test
    public void testLookupJoinBoundedPartitionedTable() throws Exception {
        TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        batchEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        batchEnv.useCatalog(hiveCatalog.getName());
        batchEnv.executeSql("insert overwrite bounded_partition_table values (1,'a',08,2019,'08','01'),(1,'a',10,2020,'08','31'),(2,'a',21,2020,'08','31'),(2,'b',22,2020,'08','31')").await();
        TableImpl flinkTable = (TableImpl)tableEnv.sqlQuery("select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day from  default_catalog.default_database.probe as p join bounded_partition_table for system_time as of p.p as b on p.x=b.x and p.y=b.y");
        List results = CollectionUtil.iteratorToList((Iterator)flinkTable.execute().collect());
        Assert.assertEquals((Object)"[+I[1, a, 8, 2019, 08, 01], +I[1, a, 10, 2020, 08, 31], +I[2, b, 22, 2020, 08, 31]]", (Object)results.toString());
    }

    @Test
    public void testLookupJoinPartitionedTable() throws Exception {
        TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        batchEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        batchEnv.useCatalog(hiveCatalog.getName());
        batchEnv.executeSql("insert overwrite partition_table_1 values (1,'a',08,2019,'09','01'),(1,'a',10,2020,'09','31'),(2,'a',21,2020,'09','31'),(2,'b',22,2020,'09','31'),(3,'c',33,2020,'09','31'),(1,'a',101,2020,'08','01'),(2,'a',121,2020,'08','01'),(2,'b',122,2020,'08','01')").await();
        TableImpl flinkTable = (TableImpl)tableEnv.sqlQuery("select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day from  default_catalog.default_database.probe as p join partition_table_1 for system_time as of p.p as b on p.x=b.x and p.y=b.y");
        List results = CollectionUtil.iteratorToList((Iterator)flinkTable.execute().collect());
        Assert.assertEquals((Object)"[+I[1, a, 10, 2020, 09, 31], +I[2, b, 22, 2020, 09, 31], +I[3, c, 33, 2020, 09, 31]]", (Object)results.toString());
    }

    @Test
    public void testLookupJoinPartitionedTableWithPartitionTime() throws Exception {
        TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        batchEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        batchEnv.useCatalog(hiveCatalog.getName());
        batchEnv.executeSql("insert overwrite partition_table_2 values (1,'a',08,2020,'08','01'),(1,'a',10,2020,'08','31'),(2,'a',21,2019,'08','31'),(2,'b',22,2020,'08','31'),(3,'c',33,2017,'08','31'),(1,'a',101,2017,'09','01'),(2,'a',121,2019,'09','01'),(2,'b',122,2019,'09','01')").await();
        TableImpl flinkTable = (TableImpl)tableEnv.sqlQuery("select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day from  default_catalog.default_database.probe as p join partition_table_2 for system_time as of p.p as b on p.x=b.x and p.y=b.y");
        List results = CollectionUtil.iteratorToList((Iterator)flinkTable.execute().collect());
        Assert.assertEquals((Object)"[+I[1, a, 10, 2020, 08, 31], +I[2, b, 22, 2020, 08, 31]]", (Object)results.toString());
    }

    @Test
    public void testLookupJoinPartitionedTableWithCreateTime() throws Exception {
        TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        batchEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        batchEnv.useCatalog(hiveCatalog.getName());
        batchEnv.executeSql("insert overwrite partition_table_3 values (1,'a',08,2020,'month1','01'),(1,'a',10,2020,'month2','02'),(2,'a',21,2020,'month1','02'),(2,'b',22,2020,'month3','20'),(3,'c',22,2020,'month3','20'),(3,'c',33,2017,'08','31'),(1,'a',101,2017,'09','01'),(2,'a',121,2019,'09','01'),(2,'b',122,2019,'09','01')").await();
        batchEnv.executeSql("insert overwrite partition_table_3 values (1,'a',101,2020,'08','01'),(2,'a',121,2020,'08','01'),(2,'b',122,2020,'08','01')").await();
        TableImpl flinkTable = (TableImpl)tableEnv.sqlQuery("select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day from  default_catalog.default_database.probe as p join partition_table_3 for system_time as of p.p as b on p.x=b.x and p.y=b.y");
        List results = CollectionUtil.iteratorToList((Iterator)flinkTable.execute().collect());
        Assert.assertEquals((Object)"[+I[1, a, 101, 2020, 08, 01], +I[2, b, 122, 2020, 08, 01]]", (Object)results.toString());
    }

    private FileSystemLookupFunction<HiveTablePartition> getLookupFunction(String tableName) throws Exception {
        TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal)tableEnv;
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)hiveCatalog.getName(), (String)"default", (String)tableName);
        CatalogTable catalogTable = (CatalogTable)hiveCatalog.getTable(tableIdentifier.toObjectPath());
        HiveLookupTableSource hiveTableSource = (HiveLookupTableSource)FactoryUtil.createTableSource((Catalog)hiveCatalog, (ObjectIdentifier)tableIdentifier, (ResolvedCatalogTable)tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable), (ReadableConfig)tableEnv.getConfig().getConfiguration(), (ClassLoader)Thread.currentThread().getContextClassLoader(), (boolean)false);
        FileSystemLookupFunction lookupFunction = (FileSystemLookupFunction)hiveTableSource.getLookupFunction((int[][])new int[][]{{0}});
        return lookupFunction;
    }

    @AfterClass
    public static void tearDown() {
        tableEnv.executeSql("drop table bounded_table");
        tableEnv.executeSql("drop table bounded_partition_table");
        tableEnv.executeSql("drop table partition_table");
        tableEnv.executeSql("drop table partition_table_1");
        tableEnv.executeSql("drop table partition_table_2");
        tableEnv.executeSql("drop table partition_table_3");
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }
}

