/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.catalog.hive;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileReader;
import java.io.PrintStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class HiveCatalogITCase {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static HiveCatalog hiveCatalog;
    private String sourceTableName = "csv_source";
    private String sinkTableName = "csv_sink";

    @BeforeClass
    public static void createCatalog() {
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        hiveCatalog.open();
    }

    @AfterClass
    public static void closeCatalog() {
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }

    @Test
    public void testCsvTableViaSQL() {
        TableEnvironment tableEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inBatchMode());
        tableEnv.registerCatalog("myhive", (Catalog)hiveCatalog);
        tableEnv.useCatalog("myhive");
        String path = this.getClass().getResource("/csv/test.csv").getPath();
        tableEnv.executeSql("create table test2 (name String, age Int) with (\n   'connector.type' = 'filesystem',\n   'connector.path' = 'file://" + path + "',\n   'format.type' = 'csv'\n)");
        Table t = tableEnv.sqlQuery("SELECT * FROM myhive.`default`.test2");
        List result = CollectionUtil.iteratorToList((Iterator)t.execute().collect());
        Assert.assertEquals(new HashSet<Row>(Arrays.asList(Row.of((Object[])new Object[]{"1", 1}), Row.of((Object[])new Object[]{"2", 2}), Row.of((Object[])new Object[]{"3", 3}))), new HashSet(result));
        tableEnv.executeSql("ALTER TABLE test2 RENAME TO newtable");
        t = tableEnv.sqlQuery("SELECT * FROM myhive.`default`.newtable");
        result = CollectionUtil.iteratorToList((Iterator)t.execute().collect());
        Assert.assertEquals(new HashSet<Row>(Arrays.asList(Row.of((Object[])new Object[]{"1", 1}), Row.of((Object[])new Object[]{"2", 2}), Row.of((Object[])new Object[]{"3", 3}))), new HashSet(result));
        tableEnv.executeSql("DROP TABLE newtable");
    }

    @Test
    public void testCsvTableViaAPI() throws Exception {
        TableEnvironment tableEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inBatchMode());
        tableEnv.getConfig().addConfiguration(new Configuration().set(CoreOptions.DEFAULT_PARALLELISM, (Object)1));
        tableEnv.registerCatalog("myhive", (Catalog)hiveCatalog);
        tableEnv.useCatalog("myhive");
        TableSchema schema = TableSchema.builder().field("name", DataTypes.STRING()).field("age", DataTypes.INT()).build();
        HashMap<String, String> sourceOptions = new HashMap<String, String>();
        sourceOptions.put("connector.type", "filesystem");
        sourceOptions.put("connector.path", this.getClass().getResource("/csv/test.csv").getPath());
        sourceOptions.put("format.type", "csv");
        CatalogTableImpl source = new CatalogTableImpl(schema, sourceOptions, "Comment.");
        Path p = Paths.get(this.tempFolder.newFolder().getAbsolutePath(), "test.csv");
        HashMap<String, String> sinkOptions = new HashMap<String, String>();
        sinkOptions.put("connector.type", "filesystem");
        sinkOptions.put("connector.path", p.toAbsolutePath().toString());
        sinkOptions.put("format.type", "csv");
        CatalogTableImpl sink = new CatalogTableImpl(schema, sinkOptions, "Comment.");
        hiveCatalog.createTable(new ObjectPath("default", this.sourceTableName), (CatalogBaseTable)source, false);
        hiveCatalog.createTable(new ObjectPath("default", this.sinkTableName), (CatalogBaseTable)sink, false);
        Table t = tableEnv.sqlQuery(String.format("select * from myhive.`default`.%s", this.sourceTableName));
        List result = CollectionUtil.iteratorToList((Iterator)t.execute().collect());
        result.sort(Comparator.comparing(String::valueOf));
        Assert.assertEquals(Arrays.asList(Row.of((Object[])new Object[]{"1", 1}), Row.of((Object[])new Object[]{"2", 2}), Row.of((Object[])new Object[]{"3", 3})), (Object)result);
        tableEnv.executeSql(String.format("insert into myhive.`default`.%s select * from myhive.`default`.%s", this.sinkTableName, this.sourceTableName)).await();
        File resultFile = new File(p.toAbsolutePath().toString());
        BufferedReader reader = new BufferedReader(new FileReader(resultFile));
        for (int i = 0; i < 3; ++i) {
            String readLine = reader.readLine();
            Assert.assertEquals((Object)String.format("%d,%d", i + 1, i + 1), (Object)readLine);
        }
        Assert.assertNull((Object)reader.readLine());
        tableEnv.executeSql(String.format("DROP TABLE %s", this.sourceTableName));
        tableEnv.executeSql(String.format("DROP TABLE %s", this.sinkTableName));
    }

    @Test
    public void testReadWriteCsv() throws Exception {
        TableEnvironment tableEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        tableEnv.registerCatalog("myhive", (Catalog)hiveCatalog);
        tableEnv.useCatalog("myhive");
        String srcPath = this.getClass().getResource("/csv/test3.csv").getPath();
        tableEnv.executeSql("CREATE TABLE src (price DECIMAL(10, 2),currency STRING,ts6 TIMESTAMP(6),ts AS CAST(ts6 AS TIMESTAMP(3)),WATERMARK FOR ts AS ts) " + String.format("WITH ('connector.type' = 'filesystem','connector.path' = 'file://%s','format.type' = 'csv')", srcPath));
        String sinkPath = new File(this.tempFolder.newFolder(), "csv-order-sink").toURI().toString();
        tableEnv.executeSql("CREATE TABLE sink (window_end TIMESTAMP(3),max_ts TIMESTAMP(6),counter BIGINT,total_price DECIMAL(10, 2)) " + String.format("WITH ('connector.type' = 'filesystem','connector.path' = '%s','format.type' = 'csv')", sinkPath));
        tableEnv.executeSql("INSERT INTO sink SELECT TUMBLE_END(ts, INTERVAL '5' SECOND),MAX(ts6),COUNT(*),MAX(price) FROM src GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)").await();
        String expected = "2019-12-12 00:00:05.0,2019-12-12 00:00:04.004001,3,50.00\n2019-12-12 00:00:10.0,2019-12-12 00:00:06.006001,2,5.33\n";
        Assert.assertEquals((Object)expected, (Object)org.apache.flink.util.FileUtils.readFileUtf8((File)new File(new URI(sinkPath))));
    }

    @Test
    public void testBatchReadWriteCsvWithProctime() {
        this.testReadWriteCsvWithProctime(false);
    }

    @Test
    public void testStreamReadWriteCsvWithProctime() {
        this.testReadWriteCsvWithProctime(true);
    }

    private void testReadWriteCsvWithProctime(boolean isStreaming) {
        TableEnvironment tableEnv = this.prepareTable(isStreaming);
        List rows = CollectionUtil.iteratorToList((Iterator)tableEnv.executeSql("SELECT * FROM proctime_src").collect());
        Assert.assertEquals((long)5L, (long)rows.size());
        tableEnv.executeSql("DROP TABLE proctime_src");
    }

    @Test
    public void testTableApiWithProctimeForBatch() {
        this.testTableApiWithProctime(false);
    }

    @Test
    public void testTableApiWithProctimeForStreaming() {
        this.testTableApiWithProctime(true);
    }

    private void testTableApiWithProctime(boolean isStreaming) {
        TableEnvironment tableEnv = this.prepareTable(isStreaming);
        List rows = CollectionUtil.iteratorToList((Iterator)tableEnv.from("proctime_src").select(new Expression[]{Expressions.$((String)"price"), Expressions.$((String)"ts"), Expressions.$((String)"l_proctime")}).execute().collect());
        Assert.assertEquals((long)5L, (long)rows.size());
        tableEnv.executeSql("DROP TABLE proctime_src");
    }

    private TableEnvironment prepareTable(boolean isStreaming) {
        EnvironmentSettings settings = isStreaming ? EnvironmentSettings.inStreamingMode() : EnvironmentSettings.inBatchMode();
        TableEnvironment tableEnv = TableEnvironment.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        tableEnv.registerCatalog("myhive", (Catalog)hiveCatalog);
        tableEnv.useCatalog("myhive");
        String srcPath = this.getClass().getResource("/csv/test3.csv").getPath();
        tableEnv.executeSql("CREATE TABLE proctime_src (price DECIMAL(10, 2),currency STRING,ts6 TIMESTAMP(6),ts AS CAST(ts6 AS TIMESTAMP(3)),WATERMARK FOR ts AS ts,l_proctime AS PROCTIME( )) " + String.format("WITH ('connector.type' = 'filesystem','connector.path' = 'file://%s','format.type' = 'csv')", srcPath));
        return tableEnv;
    }

    @Test
    public void testTableWithPrimaryKey() {
        TableEnvironment tableEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        tableEnv.registerCatalog("catalog1", (Catalog)hiveCatalog);
        tableEnv.useCatalog("catalog1");
        String createTable = "CREATE TABLE pk_src (\n  uuid varchar(40) not null,\n  price DECIMAL(10, 2),\n  currency STRING,\n  ts6 TIMESTAMP(6),\n  ts AS CAST(ts6 AS TIMESTAMP(3)),\n  WATERMARK FOR ts AS ts,\n  constraint ct1 PRIMARY KEY(uuid) NOT ENFORCED)\n  WITH (\n    'connector.type' = 'filesystem',    'connector.path' = 'file://fakePath',    'format.type' = 'csv')";
        tableEnv.executeSql("CREATE TABLE pk_src (\n  uuid varchar(40) not null,\n  price DECIMAL(10, 2),\n  currency STRING,\n  ts6 TIMESTAMP(6),\n  ts AS CAST(ts6 AS TIMESTAMP(3)),\n  WATERMARK FOR ts AS ts,\n  constraint ct1 PRIMARY KEY(uuid) NOT ENFORCED)\n  WITH (\n    'connector.type' = 'filesystem',    'connector.path' = 'file://fakePath',    'format.type' = 'csv')");
        TableSchema tableSchema = tableEnv.getCatalog(tableEnv.getCurrentCatalog()).map(catalog -> {
            try {
                ObjectPath tablePath = ObjectPath.fromString((String)(catalog.getDefaultDatabase() + '.' + "pk_src"));
                return catalog.getTable(tablePath).getSchema();
            }
            catch (TableNotExistException e) {
                return null;
            }
        }).orElse(null);
        Assert.assertNotNull((Object)tableSchema);
        Assert.assertEquals((Object)tableSchema.getPrimaryKey(), Optional.of(UniqueConstraint.primaryKey((String)"ct1", Collections.singletonList("uuid"))));
        tableEnv.executeSql("DROP TABLE pk_src");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNewTableFactory() throws Exception {
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.newInstance().inBatchMode().build());
        tEnv.registerCatalog("myhive", (Catalog)hiveCatalog);
        tEnv.useCatalog("myhive");
        tEnv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)1);
        String path = this.getClass().getResource("/csv/test.csv").getPath();
        PrintStream originalSystemOut = System.out;
        try {
            ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
            System.setOut(new PrintStream(arrayOutputStream));
            tEnv.executeSql("create table csv_table (name String, age Int) with ('connector.type' = 'filesystem','connector.path' = 'file://" + path + "','format.type' = 'csv')");
            tEnv.executeSql("create table print_table (name String, age Int) with ('connector' = 'print')");
            tEnv.executeSql("insert into print_table select * from csv_table").await();
            Assert.assertEquals((Object)"+I[1, 1]\n+I[2, 2]\n+I[3, 3]\n", (Object)arrayOutputStream.toString());
        }
        finally {
            if (System.out != originalSystemOut) {
                System.out.close();
            }
            System.setOut(originalSystemOut);
            tEnv.executeSql("DROP TABLE csv_table");
            tEnv.executeSql("DROP TABLE print_table");
        }
    }

    @Test
    public void testConcurrentAccessHiveCatalog() throws Exception {
        int numThreads = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
        Callable<List> listDBCallable = () -> hiveCatalog.listDatabases();
        ArrayList<Future<List>> listDBFutures = new ArrayList<Future<List>>();
        for (int i = 0; i < numThreads; ++i) {
            listDBFutures.add(executorService.submit(listDBCallable));
        }
        executorService.shutdown();
        for (Future future : listDBFutures) {
            future.get(5L, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testTemporaryGenericTable() throws Exception {
        TableEnvironment tableEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        tableEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(Arrays.asList(Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2})));
        tableEnv.executeSql("create temporary table src(x int) with ('connector'='COLLECTION','is-bounded' = 'false')");
        File tempDir = Files.createTempDirectory("dest-", new FileAttribute[0]).toFile();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> FileUtils.deleteQuietly((File)tempDir)));
        tableEnv.executeSql("create temporary table dest(x int) with ('connector' = 'filesystem'," + String.format("'path' = 'file://%s/1.csv',", tempDir.getAbsolutePath()) + "'format' = 'csv')");
        tableEnv.executeSql("insert into dest select * from src").await();
        tableEnv.executeSql("create temporary table datagen(i int) with ('connector'='datagen','rows-per-second'='5','fields.i.kind'='sequence','fields.i.start'='1','fields.i.end'='10')");
        tableEnv.executeSql("create temporary table blackhole(i int) with ('connector'='blackhole')");
        tableEnv.executeSql("insert into blackhole select * from datagen").await();
    }

    @Test
    public void testCreateTableLike() throws Exception {
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode();
        tableEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        tableEnv.executeSql("create table generic_table (x int) with ('connector'='COLLECTION')");
        tableEnv.useCatalog("default_catalog");
        tableEnv.executeSql(String.format("create table copy like `%s`.`default`.generic_table", hiveCatalog.getName()));
        Catalog builtInCat = (Catalog)tableEnv.getCatalog("default_catalog").get();
        CatalogBaseTable catalogTable = builtInCat.getTable(new ObjectPath("default_database", "copy"));
        Assert.assertEquals((long)1L, (long)catalogTable.getOptions().size());
        Assert.assertEquals((Object)"COLLECTION", catalogTable.getOptions().get(FactoryUtil.CONNECTOR.key()));
        Assert.assertEquals((long)1L, (long)catalogTable.getSchema().getFieldCount());
        Assert.assertEquals((Object)"x", (Object)catalogTable.getSchema().getFieldNames()[0]);
        Assert.assertEquals((Object)DataTypes.INT(), (Object)catalogTable.getSchema().getFieldDataTypes()[0]);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testViewSchema() throws Exception {
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
        tableEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.useDatabase("db1");
            tableEnv.executeSql("create table src(x int,ts timestamp(3)) with ('connector'='datagen','number-of-rows'='10')");
            tableEnv.executeSql("create view v1 as select x,ts from src order by x limit 3");
            CatalogView catalogView = (CatalogView)hiveCatalog.getTable(new ObjectPath("db1", "v1"));
            Schema viewSchema = catalogView.getUnresolvedSchema();
            Assert.assertEquals((Object)Schema.newBuilder().fromFields(new String[]{"x", "ts"}, new AbstractDataType[]{DataTypes.INT(), DataTypes.TIMESTAMP((int)3)}).build(), (Object)viewSchema);
            List results = CollectionUtil.iteratorToList((Iterator)tableEnv.executeSql("select x from v1").collect());
            Assert.assertEquals((long)3L, (long)results.size());
            tableEnv.executeSql("create view v2 (v2_x,v2_ts) comment 'v2 comment' as select x,cast(ts as timestamp_ltz(3)) from v1");
            catalogView = (CatalogView)hiveCatalog.getTable(new ObjectPath("db1", "v2"));
            Assert.assertEquals((Object)Schema.newBuilder().fromFields(new String[]{"v2_x", "v2_ts"}, new AbstractDataType[]{DataTypes.INT(), DataTypes.TIMESTAMP_LTZ((int)3)}).build(), (Object)catalogView.getUnresolvedSchema());
            Assert.assertEquals((Object)"v2 comment", (Object)catalogView.getComment());
            results = CollectionUtil.iteratorToList((Iterator)tableEnv.executeSql("select * from v2").collect());
            Assert.assertEquals((long)3L, (long)results.size());
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }
}

