/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelNode;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class HiveTableSourceITCase
extends BatchAbstractTestBase {
    private static HiveCatalog hiveCatalog;
    private static TableEnvironment batchTableEnv;

    @BeforeClass
    public static void createCatalog() {
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        hiveCatalog.open();
        batchTableEnv = HiveTableSourceITCase.createTableEnv();
    }

    @AfterClass
    public static void closeCatalog() {
        if (null != hiveCatalog) {
            hiveCatalog.close();
        }
    }

    @Before
    public void setupSourceDatabaseAndData() {
        batchTableEnv.executeSql("CREATE DATABASE IF NOT EXISTS source_db");
    }

    @Test
    public void testReadNonPartitionedTable() throws Exception {
        String dbName = "source_db";
        String tblName = "test";
        batchTableEnv.executeSql("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test").addRow(new Object[]{1, 1, "a", 1000L, 1.11}).addRow(new Object[]{2, 2, "b", 2000L, 2.22}).addRow(new Object[]{3, 3, "c", 3000L, 3.33}).addRow(new Object[]{4, 4, "d", 4000L, 4.44}).commit();
        Table src = batchTableEnv.sqlQuery("select * from hive.source_db.test");
        List rows = CollectionUtil.iteratorToList((Iterator)src.execute().collect());
        Assert.assertEquals((long)4L, (long)rows.size());
        Assert.assertEquals((Object)"+I[1, 1, a, 1000, 1.11]", (Object)((Row)rows.get(0)).toString());
        Assert.assertEquals((Object)"+I[2, 2, b, 2000, 2.22]", (Object)((Row)rows.get(1)).toString());
        Assert.assertEquals((Object)"+I[3, 3, c, 3000, 3.33]", (Object)((Row)rows.get(2)).toString());
        Assert.assertEquals((Object)"+I[4, 4, d, 4000, 4.44]", (Object)((Row)rows.get(3)).toString());
    }

    @Test
    public void testReadComplexDataType() throws Exception {
        String dbName = "source_db";
        String tblName = "complex_test";
        batchTableEnv.executeSql("create table source_db.complex_test(a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>)");
        Object[] array = new Integer[]{1, 2, 3};
        LinkedHashMap<Integer, String> map = new LinkedHashMap<Integer, String>();
        map.put(1, "a");
        map.put(2, "b");
        Object[] struct = new Object[]{3, 3L};
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "complex_test").addRow(new Object[]{array, map, struct}).commit();
        Table src = batchTableEnv.sqlQuery("select * from hive.source_db.complex_test");
        List rows = CollectionUtil.iteratorToList((Iterator)src.execute().collect());
        Assert.assertEquals((long)1L, (long)rows.size());
        Assert.assertArrayEquals((Object[])array, (Object[])((Integer[])((Row)rows.get(0)).getField(0)));
        Assert.assertEquals(map, (Object)((Row)rows.get(0)).getField(1));
        Assert.assertEquals((Object)Row.of((Object[])new Object[]{struct[0], struct[1]}), (Object)((Row)rows.get(0)).getField(2));
    }

    @Test
    public void testReadPartitionTable() throws Exception {
        String dbName = "source_db";
        String tblName = "test_table_pt";
        batchTableEnv.executeSql("CREATE TABLE source_db.test_table_pt (`year` STRING, `value` INT) partitioned by (pt int)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_table_pt").addRow(new Object[]{"2014", 3}).addRow(new Object[]{"2014", 4}).commit("pt=0");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_table_pt").addRow(new Object[]{"2015", 2}).addRow(new Object[]{"2015", 5}).commit("pt=1");
        Table src = batchTableEnv.sqlQuery("select * from hive.source_db.test_table_pt");
        List rows = CollectionUtil.iteratorToList((Iterator)src.execute().collect());
        Assert.assertEquals((long)4L, (long)rows.size());
        Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
        Assert.assertArrayEquals((Object[])new String[]{"+I[2014, 3, 0]", "+I[2014, 4, 0]", "+I[2015, 2, 1]", "+I[2015, 5, 1]"}, (Object[])rowStrings);
    }

    @Test
    public void testPartitionPrunning() throws Exception {
        String dbName = "source_db";
        String tblName = "test_table_pt_1";
        batchTableEnv.executeSql("CREATE TABLE source_db.test_table_pt_1 (`year` STRING, `value` INT) partitioned by (pt int)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_table_pt_1").addRow(new Object[]{"2014", 3}).addRow(new Object[]{"2014", 4}).commit("pt=0");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_table_pt_1").addRow(new Object[]{"2015", 2}).addRow(new Object[]{"2015", 5}).commit("pt=1");
        Table src = batchTableEnv.sqlQuery("select * from hive.source_db.test_table_pt_1 where pt = 0");
        String[] explain = src.explain(new ExplainDetail[0]).split("==.*==\n");
        Assert.assertEquals((long)4L, (long)explain.length);
        String optimizedLogicalPlan = explain[2];
        Assert.assertTrue((String)optimizedLogicalPlan, (boolean)optimizedLogicalPlan.contains("table=[[hive, source_db, test_table_pt_1, partitions=[{pt=0}], project=[year, value]]]"));
        List rows = CollectionUtil.iteratorToList((Iterator)src.execute().collect());
        Assert.assertEquals((long)2L, (long)rows.size());
        Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
        Assert.assertArrayEquals((Object[])new String[]{"+I[2014, 3, 0]", "+I[2014, 4, 0]"}, (Object[])rowStrings);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPartitionFilter() throws Exception {
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        TestPartitionFilterCatalog catalog = new TestPartitionFilterCatalog(hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion());
        tableEnv.registerCatalog(catalog.getName(), (Catalog)catalog);
        tableEnv.useCatalog(catalog.getName());
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.part(x int) partitioned by (p1 int,p2 string)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{1}).commit("p1=1,p2='a'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{2}).commit("p1=2,p2='b'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{3}).commit("p1=3,p2='c'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{4}).commit("p1=4,p2='c:2'");
            Table query = tableEnv.sqlQuery("select x from db1.part where p1>1 or p2<>'a' order by x");
            String[] explain = query.explain(new ExplainDetail[0]).split("==.*==\n");
            Assert.assertFalse((boolean)catalog.fallback);
            String optimizedPlan = explain[2];
            Assert.assertTrue((String)optimizedPlan, (boolean)optimizedPlan.contains("table=[[test-catalog, db1, part, partitions=[{p1=2, p2=b}, {p1=3, p2=c}, {p1=4, p2=c:2}]"));
            List results = CollectionUtil.iteratorToList((Iterator)query.execute().collect());
            Assert.assertEquals((Object)"[+I[2], +I[3], +I[4]]", (Object)results.toString());
            query = tableEnv.sqlQuery("select x from db1.part where p1>2 and p2<='a' order by x");
            explain = query.explain(new ExplainDetail[0]).split("==.*==\n");
            Assert.assertFalse((boolean)catalog.fallback);
            optimizedPlan = explain[2];
            Assert.assertTrue((String)optimizedPlan, (boolean)optimizedPlan.contains("table=[[test-catalog, db1, part, partitions=[], project=[x]]]"));
            results = CollectionUtil.iteratorToList((Iterator)query.execute().collect());
            Assert.assertEquals((Object)"[]", (Object)results.toString());
            query = tableEnv.sqlQuery("select x from db1.part where p1 in (1,3,5) order by x");
            explain = query.explain(new ExplainDetail[0]).split("==.*==\n");
            Assert.assertFalse((boolean)catalog.fallback);
            optimizedPlan = explain[2];
            Assert.assertTrue((String)optimizedPlan, (boolean)optimizedPlan.contains("table=[[test-catalog, db1, part, partitions=[{p1=1, p2=a}, {p1=3, p2=c}], project=[x]]]"));
            results = CollectionUtil.iteratorToList((Iterator)query.execute().collect());
            Assert.assertEquals((Object)"[+I[1], +I[3]]", (Object)results.toString());
            query = tableEnv.sqlQuery("select x from db1.part where (p1=1 and p2='a') or ((p1=2 and p2='b') or p2='d') order by x");
            explain = query.explain(new ExplainDetail[0]).split("==.*==\n");
            Assert.assertFalse((boolean)catalog.fallback);
            optimizedPlan = explain[2];
            Assert.assertTrue((String)optimizedPlan, (boolean)optimizedPlan.contains("table=[[test-catalog, db1, part, partitions=[{p1=1, p2=a}, {p1=2, p2=b}], project=[x]]]"));
            results = CollectionUtil.iteratorToList((Iterator)query.execute().collect());
            Assert.assertEquals((Object)"[+I[1], +I[2]]", (Object)results.toString());
            query = tableEnv.sqlQuery("select x from db1.part where p2 = 'c:2' order by x");
            explain = query.explain(new ExplainDetail[0]).split("==.*==\n");
            Assert.assertFalse((boolean)catalog.fallback);
            optimizedPlan = explain[2];
            Assert.assertTrue((String)optimizedPlan, (boolean)optimizedPlan.contains("table=[[test-catalog, db1, part, partitions=[{p1=4, p2=c:2}], project=[x]]]"));
            results = CollectionUtil.iteratorToList((Iterator)query.execute().collect());
            Assert.assertEquals((Object)"[+I[4]]", (Object)results.toString());
            query = tableEnv.sqlQuery("select x from db1.part where '' = p2");
            explain = query.explain(new ExplainDetail[0]).split("==.*==\n");
            Assert.assertFalse((boolean)catalog.fallback);
            optimizedPlan = explain[2];
            Assert.assertTrue((String)optimizedPlan, (boolean)optimizedPlan.contains("table=[[test-catalog, db1, part, partitions=[], project=[x]]]"));
            results = CollectionUtil.iteratorToList((Iterator)query.execute().collect());
            Assert.assertEquals((Object)"[]", (Object)results.toString());
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPartitionFilterDateTimestamp() throws Exception {
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        TestPartitionFilterCatalog catalog = new TestPartitionFilterCatalog(hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion());
        tableEnv.registerCatalog(catalog.getName(), (Catalog)catalog);
        tableEnv.useCatalog(catalog.getName());
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.part(x int) partitioned by (p1 date,p2 timestamp)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{1}).commit("p1='2018-08-08',p2='2018-08-08 08:08:08.1'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{2}).commit("p1='2018-08-09',p2='2018-08-08 08:08:09.1'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{3}).commit("p1='2018-08-10',p2='2018-08-08 08:08:10.1'");
            Table query = tableEnv.sqlQuery("select x from db1.part where p1>cast('2018-08-09' as date) and p2<>cast('2018-08-08 08:08:09.1' as timestamp)");
            String[] explain = query.explain(new ExplainDetail[0]).split("==.*==\n");
            Assert.assertTrue((boolean)catalog.fallback);
            String optimizedPlan = explain[2];
            Assert.assertTrue((String)optimizedPlan, (boolean)optimizedPlan.contains("table=[[test-catalog, db1, part, partitions=[{p1=2018-08-10, p2=2018-08-08 08:08:10.1}]"));
            List results = CollectionUtil.iteratorToList((Iterator)query.execute().collect());
            Assert.assertEquals((Object)"[+I[3]]", (Object)results.toString());
            query = tableEnv.sqlQuery("select x from db1.part where timestamp '2018-08-08 08:08:09.1' = p2");
            results = CollectionUtil.iteratorToList((Iterator)query.execute().collect());
            Assert.assertEquals((Object)"[+I[2]]", (Object)results.toString());
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProjectionPushDown() throws Exception {
        batchTableEnv.executeSql("create table src(x int,y string) partitioned by (p1 bigint, p2 string)");
        try {
            HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "src").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit("p1=2013, p2='2013'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "src").addRow(new Object[]{3, "c"}).commit("p1=2014, p2='2014'");
            Table table = batchTableEnv.sqlQuery("select p1, count(y) from hive.`default`.src group by p1");
            String[] explain = table.explain(new ExplainDetail[0]).split("==.*==\n");
            Assert.assertEquals((long)4L, (long)explain.length);
            String logicalPlan = explain[2];
            String expectedExplain = "table=[[hive, default, src, project=[p1, y]]]";
            Assert.assertTrue((String)logicalPlan, (boolean)logicalPlan.contains(expectedExplain));
            List rows = CollectionUtil.iteratorToList((Iterator)table.execute().collect());
            Assert.assertEquals((long)2L, (long)rows.size());
            Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
            Assert.assertArrayEquals((Object[])new String[]{"+I[2013, 2]", "+I[2014, 1]"}, (Object[])rowStrings);
        }
        finally {
            batchTableEnv.executeSql("drop table src");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLimitPushDown() throws Exception {
        batchTableEnv.executeSql("create table src (a string)");
        try {
            HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "src").addRow(new Object[]{"a"}).addRow(new Object[]{"b"}).addRow(new Object[]{"c"}).addRow(new Object[]{"d"}).commit();
            Table table = batchTableEnv.sqlQuery("select * from hive.`default`.src limit 1");
            String[] explain = table.explain(new ExplainDetail[0]).split("==.*==\n");
            Assert.assertEquals((long)4L, (long)explain.length);
            String logicalPlan = explain[2];
            Assert.assertTrue((String)logicalPlan, (boolean)logicalPlan.contains("table=[[hive, default, src, limit=[1]]]"));
            List rows = CollectionUtil.iteratorToList((Iterator)table.execute().collect());
            Assert.assertEquals((long)1L, (long)rows.size());
            Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
            Assert.assertArrayEquals((Object[])new String[]{"+I[a]"}, (Object[])rowStrings);
        }
        finally {
            batchTableEnv.executeSql("drop table src");
        }
    }

    @Test
    public void testParallelismSetting() throws Exception {
        String dbName = "source_db";
        String tblName = "test_parallelism";
        batchTableEnv.executeSql("CREATE TABLE source_db.test_parallelism (`year` STRING, `value` INT) partitioned by (pt int)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism").addRow(new Object[]{"2014", 3}).addRow(new Object[]{"2014", 4}).commit("pt=0");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism").addRow(new Object[]{"2015", 2}).addRow(new Object[]{"2015", 5}).commit("pt=1");
        Table table = batchTableEnv.sqlQuery("select * from hive.source_db.test_parallelism");
        this.testParallelismSettingTranslateAndAssert(2, table, batchTableEnv);
    }

    @Test
    public void testParallelismSettingWithFileNum() throws IOException {
        File dir = Files.createTempDirectory("testParallelismSettingWithFileNum", new FileAttribute[0]).toFile();
        dir.deleteOnExit();
        for (int i = 0; i < 3; ++i) {
            File csv = new File(dir, "data" + i + ".csv");
            csv.createNewFile();
            FileUtils.writeFileUtf8((File)csv, (String)"1|100\n2|200\n");
        }
        TableEnvironment tEnv = HiveTableSourceITCase.createTableEnv();
        tEnv.executeSql("CREATE EXTERNAL TABLE source_db.test_parallelism_setting_with_file_num (a INT, b INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LOCATION '" + dir.toString() + "'");
        Table table = tEnv.sqlQuery("select * from hive.source_db.test_parallelism_setting_with_file_num");
        this.testParallelismSettingTranslateAndAssert(3, table, tEnv);
        tEnv.getConfig().getConfiguration().setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 2);
        this.testParallelismSettingTranslateAndAssert(2, table, tEnv);
    }

    private void testParallelismSettingTranslateAndAssert(int expected, Table table, TableEnvironment tEnv) {
        PlannerBase planner = (PlannerBase)((TableEnvironmentImpl)tEnv).getPlanner();
        RelNode relNode = planner.optimize(TableTestUtil.toRelNode((Table)table));
        ExecNode execNode = (ExecNode)planner.translateToExecNodeGraph(JavaScalaConversionUtil.toScala(Collections.singletonList(relNode))).getRootNodes().get(0);
        Transformation transformation = execNode.translateToPlan((Planner)planner);
        Assert.assertEquals((long)expected, (long)transformation.getParallelism());
    }

    @Test
    public void testParallelismOnLimitPushDown() throws Exception {
        String dbName = "source_db";
        String tblName = "test_parallelism_limit_pushdown";
        TableEnvironment tEnv = HiveTableSourceITCase.createTableEnv();
        tEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
        tEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
        tEnv.executeSql("CREATE TABLE source_db.test_parallelism_limit_pushdown (`year` STRING, `value` INT) partitioned by (pt int)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism_limit_pushdown").addRow(new Object[]{"2014", 3}).addRow(new Object[]{"2014", 4}).commit("pt=0");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism_limit_pushdown").addRow(new Object[]{"2015", 2}).addRow(new Object[]{"2015", 5}).commit("pt=1");
        Table table = tEnv.sqlQuery("select * from hive.source_db.test_parallelism_limit_pushdown limit 1");
        PlannerBase planner = (PlannerBase)((TableEnvironmentImpl)tEnv).getPlanner();
        RelNode relNode = planner.optimize(TableTestUtil.toRelNode((Table)table));
        ExecNode execNode = (ExecNode)planner.translateToExecNodeGraph(JavaScalaConversionUtil.toScala(Collections.singletonList(relNode))).getRootNodes().get(0);
        Transformation transformation = (Transformation)((Transformation)execNode.translateToPlan((Planner)planner).getInputs().get(0)).getInputs().get(0);
        Assert.assertEquals((long)2L, (long)transformation.getParallelism());
    }

    @Test
    public void testParallelismWithoutParallelismInfer() throws Exception {
        String dbName = "source_db";
        String tblName = "test_parallelism_no_infer";
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inBatchMode());
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tEnv.registerCatalog("hive", (Catalog)hiveCatalog);
        tEnv.useCatalog("hive");
        tEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
        tEnv.executeSql("CREATE TABLE source_db.test_parallelism_no_infer (`year` STRING, `value` INT) partitioned by (pt int)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism_no_infer").addRow(new Object[]{"2014", 3}).addRow(new Object[]{"2014", 4}).commit("pt=0");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism_no_infer").addRow(new Object[]{"2015", 2}).addRow(new Object[]{"2015", 5}).commit("pt=1");
        Table table = tEnv.sqlQuery("select * from hive.source_db.test_parallelism_no_infer limit 1");
        PlannerBase planner = (PlannerBase)((TableEnvironmentImpl)tEnv).getPlanner();
        RelNode relNode = planner.optimize(TableTestUtil.toRelNode((Table)table));
        ExecNode execNode = (ExecNode)planner.translateToExecNodeGraph(JavaScalaConversionUtil.toScala(Collections.singletonList(relNode))).getRootNodes().get(0);
        Transformation transformation = (Transformation)((Transformation)execNode.translateToPlan((Planner)planner).getInputs().get(0)).getInputs().get(0);
        Assert.assertEquals((long)((Integer)ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue()).intValue(), (long)transformation.getParallelism());
    }

    @Test
    public void testSourceConfig() throws Exception {
        Assume.assumeTrue((boolean)HiveVersionTestUtil.HIVE_210_OR_LATER);
        Map<String, String> env = System.getenv();
        batchTableEnv.executeSql("create database db1");
        try {
            batchTableEnv.executeSql("create table db1.src (x int,y string) stored as orc");
            batchTableEnv.executeSql("insert into db1.src values (1,'a'),(2,'b')").await();
            this.testSourceConfig(true, true);
            this.testSourceConfig(false, false);
        }
        finally {
            TestBaseUtils.setEnv(env);
            batchTableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test(timeout=120000L)
    public void testStreamPartitionReadByPartitionName() throws Exception {
        String catalogName = "hive";
        String dbName = "source_db";
        String tblName = "stream_partition_name_test";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(100L);
        StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
        tEnv.registerCatalog("hive", (Catalog)hiveCatalog);
        tEnv.useCatalog("hive");
        tEnv.executeSql("CREATE TABLE source_db.stream_partition_name_test (x int, y string, z int) PARTITIONED BY ( pt_year int, pt_mon string, pt_day string) TBLPROPERTIES('streaming-source.enable'='true','streaming-source.monitor-interval'='1s','streaming-source.consume-start-offset'='pt_year=2019/pt_month=09/pt_day=02')");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_partition_name_test").addRow(new Object[]{0, "a", 11}).commit("pt_year='2019',pt_mon='09',pt_day='01'");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_partition_name_test").addRow(new Object[]{1, "b", 12}).commit("pt_year='2020',pt_mon='09',pt_day='03'");
        TableResult result = tEnv.executeSql("select * from hive.source_db.stream_partition_name_test");
        CloseableIterator iter = result.collect();
        Assert.assertEquals((Object)Row.of((Object[])new Object[]{1, "b", "12", "2020", "09", "03"}).toString(), (Object)HiveTableSourceITCase.fetchRows((Iterator<Row>)iter, 1).get(0));
        for (int i = 2; i < 6; ++i) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_partition_name_test").addRow(new Object[]{i, "new_add", 11 + i}).addRow(new Object[]{i, "new_add_1", 11 + i}).commit("pt_year='2020',pt_mon='10',pt_day='0" + i + "'");
            Assert.assertEquals(Arrays.asList(Row.of((Object[])new Object[]{i, "new_add", 11 + i, "2020", "10", "0" + i}).toString(), Row.of((Object[])new Object[]{i, "new_add_1", 11 + i, "2020", "10", "0" + i}).toString()), HiveTableSourceITCase.fetchRows((Iterator<Row>)iter, 2));
        }
        ((JobClient)result.getJobClient().get()).cancel();
    }

    @Test(timeout=120000L)
    public void testStreamPartitionReadByCreateTime() throws Exception {
        String catalogName = "hive";
        String dbName = "source_db";
        String tblName = "stream_create_time_test";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(100L);
        StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
        tEnv.registerCatalog("hive", (Catalog)hiveCatalog);
        tEnv.useCatalog("hive");
        tEnv.executeSql("CREATE TABLE source_db.stream_create_time_test (x int, y string, z int) PARTITIONED BY ( p1 string, p2 string, p3 string) TBLPROPERTIES('streaming-source.enable'='true','streaming-source.partition-include'='all','streaming-source.consume-order'='create-time','streaming-source.monitor-interval'='1s','streaming-source.consume-start-offset'='2020-10-02 00:00:00')");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_create_time_test").addRow(new Object[]{0, "a", 11}).commit("p1='A1',p2='B1',p3='C1'");
        TableResult result = tEnv.executeSql("select * from hive.source_db.stream_create_time_test");
        CloseableIterator iter = result.collect();
        Assert.assertEquals((Object)Row.of((Object[])new Object[]{0, "a", "11", "A1", "B1", "C1"}).toString(), (Object)HiveTableSourceITCase.fetchRows((Iterator<Row>)iter, 1).get(0));
        for (int i = 1; i < 6; ++i) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_create_time_test").addRow(new Object[]{i, "new_add", 11 + i}).addRow(new Object[]{i, "new_add_1", 11 + i}).commit("p1='A',p2='B',p3='" + i + "'");
            Assert.assertEquals(Arrays.asList(Row.of((Object[])new Object[]{i, "new_add", 11 + i, "A", "B", i}).toString(), Row.of((Object[])new Object[]{i, "new_add_1", 11 + i, "A", "B", i}).toString()), HiveTableSourceITCase.fetchRows((Iterator<Row>)iter, 2));
        }
        ((JobClient)result.getJobClient().get()).cancel();
    }

    @Test(timeout=120000L)
    public void testStreamPartitionReadByPartitionTime() throws Exception {
        String catalogName = "hive";
        String dbName = "source_db";
        String tblName = "stream_test";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(100L);
        StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
        tEnv.registerCatalog("hive", (Catalog)hiveCatalog);
        tEnv.useCatalog("hive");
        tEnv.executeSql("CREATE TABLE source_db.stream_test ( a INT, b STRING) PARTITIONED BY (ts STRING) TBLPROPERTIES ('streaming-source.enable'='true','streaming-source.monitor-interval'='1s','streaming-source.consume-order'='partition-time')");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_test").addRow(new Object[]{0, "0"}).commit("ts='2020-05-06 00:00:00'");
        TableResult result = tEnv.executeSql("select * from hive.source_db.stream_test");
        CloseableIterator iter = result.collect();
        Assert.assertEquals((Object)Row.of((Object[])new Object[]{0, "0", "2020-05-06 00:00:00"}).toString(), (Object)HiveTableSourceITCase.fetchRows((Iterator<Row>)iter, 1).get(0));
        for (int i = 1; i < 6; ++i) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_test").addRow(new Object[]{i, String.valueOf(i)}).addRow(new Object[]{i, i + "_copy"}).commit("ts='2020-05-06 00:" + i + "0:00'");
            Assert.assertEquals(Arrays.asList(Row.of((Object[])new Object[]{i, String.valueOf(i), "2020-05-06 00:" + i + "0:00"}).toString(), Row.of((Object[])new Object[]{i, i + "_copy", "2020-05-06 00:" + i + "0:00"}).toString()), HiveTableSourceITCase.fetchRows((Iterator<Row>)iter, 2));
        }
        ((JobClient)result.getJobClient().get()).cancel();
    }

    private static List<String> fetchRows(Iterator<Row> iter, int size) {
        ArrayList<String> strings = new ArrayList<String>(size);
        for (int i = 0; i < size; ++i) {
            Assert.assertTrue((boolean)iter.hasNext());
            strings.add(iter.next().toString());
        }
        strings.sort(String::compareTo);
        return strings;
    }

    @Test(timeout=30000L)
    public void testNonPartitionStreamingSourceWithMapredReader() throws Exception {
        this.testNonPartitionStreamingSource(true, "test_mapred_reader");
    }

    @Test(timeout=30000L)
    public void testNonPartitionStreamingSourceWithVectorizedReader() throws Exception {
        this.testNonPartitionStreamingSource(false, "test_vectorized_reader");
    }

    private void testNonPartitionStreamingSource(Boolean useMapredReader, String tblName) throws Exception {
        String catalogName = "hive";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
        tEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, useMapredReader.booleanValue());
        tEnv.registerCatalog("hive", (Catalog)hiveCatalog);
        tEnv.useCatalog("hive");
        tEnv.executeSql("CREATE TABLE source_db." + tblName + " (  a INT,  b CHAR(1) ) stored as parquet TBLPROPERTIES (  'streaming-source.enable'='true',  'streaming-source.partition-order'='create-time',  'streaming-source.monitor-interval'='100ms')");
        TableResult result = tEnv.executeSql("select * from hive.source_db." + tblName);
        CloseableIterator iter = result.collect();
        for (int i = 1; i < 3; ++i) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            batchTableEnv.executeSql("insert into table source_db." + tblName + " values (1,'a'), (2,'b')").await();
            Assert.assertEquals(Arrays.asList(Row.of((Object[])new Object[]{1, "a"}).toString(), Row.of((Object[])new Object[]{2, "b"}).toString()), HiveTableSourceITCase.fetchRows((Iterator<Row>)iter, 2));
        }
        ((JobClient)result.getJobClient().get()).cancel();
    }

    private void testSourceConfig(boolean fallbackMR, boolean inferParallelism) throws Exception {
        HiveDynamicTableFactory tableFactorySpy = (HiveDynamicTableFactory)Mockito.spy((Object)((HiveDynamicTableFactory)hiveCatalog.getFactory().get()));
        ((HiveDynamicTableFactory)Mockito.doAnswer(invocation -> {
            TableSourceFactory.Context context = (TableSourceFactory.Context)invocation.getArgument(0);
            Assert.assertEquals((Object)fallbackMR, (Object)context.getConfiguration().get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
            return new TestConfigSource(new JobConf((Configuration)hiveCatalog.getHiveConf()), context.getConfiguration(), context.getObjectIdentifier().toObjectPath(), context.getTable(), inferParallelism);
        }).when((Object)tableFactorySpy)).createDynamicTableSource((DynamicTableFactory.Context)ArgumentMatchers.any(DynamicTableFactory.Context.class));
        HiveCatalog catalogSpy = (HiveCatalog)Mockito.spy((Object)hiveCatalog);
        ((HiveCatalog)Mockito.doReturn(Optional.of(tableFactorySpy)).when((Object)catalogSpy)).getTableFactory();
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode();
        tableEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, fallbackMR);
        tableEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, inferParallelism);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
        tableEnv.registerCatalog(catalogSpy.getName(), (Catalog)catalogSpy);
        tableEnv.useCatalog(catalogSpy.getName());
        List results = CollectionUtil.iteratorToList((Iterator)tableEnv.sqlQuery("select * from db1.src order by x").execute().collect());
        Assert.assertEquals((Object)"[+I[1, a], +I[2, b]]", (Object)results.toString());
    }

    @Test
    public void testParquetCaseInsensitive() throws Exception {
        this.testCaseInsensitive("parquet");
    }

    private void testCaseInsensitive(String format) throws Exception {
        TableEnvironment tEnv = HiveTestUtils.createTableEnvWithHiveCatalog(hiveCatalog);
        String folderURI = TEMPORARY_FOLDER.newFolder().toURI().toString();
        tEnv.executeSql(String.format("create table parquet_t (I int, J int) with ('connector'='filesystem','format'='%s','path'='%s')", format, folderURI));
        tEnv.executeSql("insert into parquet_t select 1, 2").await();
        tEnv.executeSql("drop table parquet_t");
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tEnv.executeSql(String.format("create external table parquet_t (i int, j int) stored as %s location '%s'", format, folderURI));
        Assert.assertEquals((Object)Row.of((Object[])new Object[]{1, 2}), (Object)tEnv.executeSql("select * from parquet_t").collect().next());
    }

    @Test(timeout=120000L)
    public void testStreamReadWithProjectPushDown() throws Exception {
        String catalogName = "hive";
        String dbName = "source_db";
        String tblName = "stream_project_pushdown_test";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(100L);
        StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
        tEnv.registerCatalog("hive", (Catalog)hiveCatalog);
        tEnv.useCatalog("hive");
        tEnv.executeSql("CREATE TABLE source_db.stream_project_pushdown_test (x int, y string, z int) PARTITIONED BY ( pt_year int, pt_mon string, pt_day string) TBLPROPERTIES('streaming-source.enable'='true','streaming-source.monitor-interval'='1s','streaming-source.consume-start-offset'='pt_year=2019/pt_month=09/pt_day=02')");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_project_pushdown_test").addRow(new Object[]{0, "a", 11}).commit("pt_year='2019',pt_mon='09',pt_day='01'");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_project_pushdown_test").addRow(new Object[]{1, "b", 12}).commit("pt_year='2020',pt_mon='09',pt_day='03'");
        TableResult result = tEnv.executeSql("select x, y from hive.source_db.stream_project_pushdown_test where pt_year = '2020'");
        CloseableIterator iter = result.collect();
        Assert.assertEquals((Object)Row.of((Object[])new Object[]{1, "b"}).toString(), (Object)HiveTableSourceITCase.fetchRows((Iterator<Row>)iter, 1).get(0));
        for (int i = 2; i < 6; ++i) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_project_pushdown_test").addRow(new Object[]{i, "new_add", 11 + i}).addRow(new Object[]{i, "new_add_1", 11 + i}).commit("pt_year='2020',pt_mon='10',pt_day='0" + i + "'");
            Assert.assertEquals(Arrays.asList(Row.of((Object[])new Object[]{i, "new_add"}).toString(), Row.of((Object[])new Object[]{i, "new_add_1"}).toString()), HiveTableSourceITCase.fetchRows((Iterator<Row>)iter, 2));
        }
        ((JobClient)result.getJobClient().get()).cancel();
    }

    private static TableEnvironment createTableEnv() {
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        tableEnv.registerCatalog("hive", (Catalog)hiveCatalog);
        tableEnv.useCatalog("hive");
        return tableEnv;
    }

    private static class TestPartitionFilterCatalog
    extends HiveCatalog {
        private boolean fallback = false;

        TestPartitionFilterCatalog(String catalogName, String defaultDatabase, @Nullable HiveConf hiveConf, String hiveVersion) {
            super(catalogName, defaultDatabase, hiveConf, hiveVersion, true);
        }

        public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException {
            this.fallback = true;
            return super.listPartitions(tablePath);
        }
    }

    private static class TestConfigSource
    extends HiveTableSource {
        private final boolean inferParallelism;

        TestConfigSource(JobConf jobConf, ReadableConfig flinkConf, ObjectPath tablePath, CatalogTable catalogTable, boolean inferParallelism) {
            super(jobConf, flinkConf, tablePath, catalogTable);
            this.inferParallelism = inferParallelism;
        }

        public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
            DataStreamSource dataStream = (DataStreamSource)super.getDataStream(execEnv);
            int parallelism = dataStream.getTransformation().getParallelism();
            Assert.assertEquals((long)(this.inferParallelism ? 1L : 2L), (long)parallelism);
            return dataStream;
        }
    }
}

