/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveRunnerSetup;
import com.klarna.hiverunner.annotations.HiveSQL;
import com.klarna.hiverunner.config.HiveRunnerConfig;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.ArrayUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=FlinkEmbeddedHiveRunner.class)
public class HiveRunnerITCase {
    @HiveSQL(files={})
    private static HiveShell hiveShell;
    @HiveRunnerSetup
    private static final HiveRunnerConfig CONFIG;
    private static HiveCatalog hiveCatalog;

    @BeforeClass
    public static void createCatalog() throws IOException {
        HiveConf hiveConf = hiveShell.getHiveConf();
        hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
        hiveCatalog.open();
    }

    @AfterClass
    public static void closeCatalog() {
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }

    @Test
    public void testInsertIntoNonPartitionTable() throws Exception {
        List<Row> toWrite = HiveRunnerITCase.generateRecords(5);
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(toWrite);
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithHiveCatalog(hiveCatalog);
        tableEnv.executeSql("create table default_catalog.default_database.src (i int,l bigint,d double,s string) with ('connector'='COLLECTION','is-bounded' = 'true')");
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql("create table dest (i int,l bigint,d double,s string)");
        try {
            tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            tableEnv.executeSql("insert into dest select * from default_catalog.default_database.src").await();
            HiveRunnerITCase.verifyWrittenData(toWrite, hiveShell.executeQuery("select * from dest"));
        }
        finally {
            tableEnv.executeSql("drop table dest");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWriteComplexType() throws Exception {
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithHiveCatalog(hiveCatalog);
        Row row = new Row(3);
        Object[] array = new Object[]{1, 2, 3};
        HashMap<Integer, String> map = new HashMap<Integer, String>();
        map.put(1, "a");
        map.put(2, "b");
        Row struct = new Row(2);
        struct.setField(0, (Object)3);
        struct.setField(1, (Object)"c");
        row.setField(0, (Object)array);
        row.setField(1, map);
        row.setField(2, (Object)struct);
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(Collections.singletonList(row));
        tableEnv.executeSql("create table default_catalog.default_database.complexSrc (a array<int>,m map<int, string>,s row<f1 int,f2 string>) with ('connector'='COLLECTION','is-bounded' = 'true')");
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql("create table dest (a array<int>,m map<int, string>,s struct<f1:int,f2:string>)");
        try {
            tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            tableEnv.executeSql("insert into dest select * from default_catalog.default_database.complexSrc").await();
            List result = hiveShell.executeQuery("select * from dest");
            Assert.assertEquals((long)1L, (long)result.size());
            Assert.assertEquals((Object)"[1,2,3]\t{1:\"a\",2:\"b\"}\t{\"f1\":3,\"f2\":\"c\"}", result.get(0));
        }
        finally {
            tableEnv.executeSql("drop table dest");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWriteNestedComplexType() throws Exception {
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithHiveCatalog(hiveCatalog);
        Row row = new Row(1);
        Object[] array = new Object[3];
        row.setField(0, (Object)array);
        for (int i = 0; i < array.length; ++i) {
            Row struct = new Row(2);
            struct.setField(0, (Object)(1 + i));
            struct.setField(1, (Object)String.valueOf((char)(97 + i)));
            array[i] = struct;
        }
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(Collections.singletonList(row));
        tableEnv.executeSql("create table default_catalog.default_database.nestedSrc (a array<row<f1 int,f2 string>>) with ('connector'='COLLECTION','is-bounded' = 'true')");
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql("create table dest (a array<struct<f1:int,f2:string>>)");
        try {
            tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            tableEnv.executeSql("insert into dest select * from default_catalog.default_database.nestedSrc").await();
            List result = hiveShell.executeQuery("select * from dest");
            Assert.assertEquals((long)1L, (long)result.size());
            Assert.assertEquals((Object)"[{\"f1\":1,\"f2\":\"a\"},{\"f1\":2,\"f2\":\"b\"},{\"f1\":3,\"f2\":\"c\"}]", result.get(0));
        }
        finally {
            tableEnv.executeSql("drop table dest");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWriteNullValues() throws Exception {
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        tableEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.src(t tinyint,s smallint,i int,b bigint,f float,d double,de decimal(10,5),ts timestamp,dt date,str string,ch char(5),vch varchar(8),bl boolean,bin binary,arr array<int>,mp map<int,string>,strt struct<f1:int,f2:string>)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null}).commit();
            hiveShell.execute("create table db1.dest like db1.src");
            tableEnv.executeSql("insert into db1.dest select * from db1.src").await();
            List results = hiveShell.executeQuery("select * from db1.dest");
            Assert.assertEquals((long)1L, (long)results.size());
            String[] cols = ((String)results.get(0)).split("\t");
            Assert.assertEquals((long)17L, (long)cols.length);
            Assert.assertEquals((Object)"NULL", (Object)cols[0]);
            Assert.assertEquals((long)1L, (long)new HashSet<String>(Arrays.asList(cols)).size());
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testDifferentFormats() throws Exception {
        String[] formats;
        for (String format : formats = new String[]{"orc", "parquet", "sequencefile", "csv", "avro"}) {
            if (format.equals("avro") && !HiveVersionTestUtil.HIVE_110_OR_LATER) continue;
            this.readWriteFormat(format);
        }
    }

    @Test
    public void testDecimal() throws Exception {
        TableEnvironment tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.src1 (x decimal(10,2))");
            tableEnv.executeSql("create table db1.src2 (x decimal(10,2))");
            tableEnv.executeSql("create table db1.dest (x decimal(10,2))");
            hiveShell.execute("insert into table db1.src1 values (1.0),(2.12),(5.123),(5.456),(123456789.12)");
            tableEnv.executeSql("insert into db1.src2 values (1.0),(2.12),(5.123),(5.456),(123456789.12)").await();
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.src2", hiveShell.executeQuery("select * from db1.src1"));
            tableEnv.executeSql("insert into db1.dest select * from db1.src1").await();
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.dest", hiveShell.executeQuery("select * from db1.src1"));
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testInsertOverwrite() throws Exception {
        TableEnvironment tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.dest (x int, y string)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "dest").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit();
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb"));
            tableEnv.executeSql("insert overwrite db1.dest values (3, 'c')").await();
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("3\tc"));
            tableEnv.executeSql("create table db1.part(x int) partitioned by (y int)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{1}).commit("y=1");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{2}).commit("y=2");
            tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
            tableEnv.executeSql("insert overwrite db1.part partition (y=1) select 100").await();
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "2\t2"));
            tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
            tableEnv.executeSql("insert overwrite db1.part values (200,2),(3,3)").await();
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "200\t2", "3\t3"));
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testStaticPartition() throws Exception {
        TableEnvironment tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.src (x int)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{1}).addRow(new Object[]{2}).commit();
            tableEnv.executeSql("create table db1.dest (x int) partitioned by (p1 string, p2 double)");
            tableEnv.executeSql("insert into db1.dest partition (p1='1\\'1', p2=1.1) select x from db1.src").await();
            Assert.assertEquals((long)1L, (long)hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size());
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\t1'1\t1.1", "2\t1'1\t1.1"));
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testDynamicPartition() throws Exception {
        TableEnvironment tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.src (x int, y string, z double)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{1, "a", 1.1}).addRow(new Object[]{2, "a", 2.2}).addRow(new Object[]{3, "b", 3.3}).commit();
            tableEnv.executeSql("create table db1.dest (x int) partitioned by (p1 string, p2 double)");
            tableEnv.executeSql("insert into db1.dest select * from db1.src").await();
            Assert.assertEquals((long)3L, (long)hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size());
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta\t1.1", "2\ta\t2.2", "3\tb\t3.3"));
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testPartialDynamicPartition() throws Exception {
        TableEnvironment tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.src (x int, y string)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit();
            tableEnv.executeSql("create table db1.dest (x int) partitioned by (p1 double, p2 string)");
            tableEnv.executeSql("insert into db1.dest partition (p1=1.1,p2) select x,y from db1.src").await();
            Assert.assertEquals((long)2L, (long)hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size());
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\t1.1\ta", "2\t1.1\tb"));
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testBatchCompressTextTable() throws Exception {
        this.testCompressTextTable(true);
    }

    @Test
    public void testStreamCompressTextTable() throws Exception {
        this.testCompressTextTable(false);
    }

    @Test
    public void testTimestamp() throws Exception {
        TableEnvironment tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.src (ts timestamp)");
            tableEnv.executeSql("create table db1.dest (ts timestamp)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{Timestamp.valueOf("2019-11-11 00:00:00")}).addRow(new Object[]{Timestamp.valueOf("2019-12-03 15:43:32.123456789")}).commit();
            List results = CollectionUtil.iteratorToList((Iterator)tableEnv.sqlQuery("select * from db1.src").execute().collect());
            Assert.assertEquals((long)2L, (long)results.size());
            Assert.assertEquals((Object)LocalDateTime.of(2019, 11, 11, 0, 0), (Object)((Row)results.get(0)).getField(0));
            Assert.assertEquals((Object)LocalDateTime.of(2019, 12, 3, 15, 43, 32, 123456789), (Object)((Row)results.get(1)).getField(0));
            tableEnv.executeSql("insert into db1.dest select max(ts) from db1.src").await();
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("2019-12-03 15:43:32.123456789"));
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testDate() throws Exception {
        TableEnvironment tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.src (dt date)");
            tableEnv.executeSql("create table db1.dest (dt date)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{Date.valueOf("2019-12-09")}).addRow(new Object[]{Date.valueOf("2019-12-12")}).commit();
            List results = CollectionUtil.iteratorToList((Iterator)tableEnv.sqlQuery("select * from db1.src").execute().collect());
            Assert.assertEquals((long)2L, (long)results.size());
            Assert.assertEquals((Object)LocalDate.of(2019, 12, 9), (Object)((Row)results.get(0)).getField(0));
            Assert.assertEquals((Object)LocalDate.of(2019, 12, 12), (Object)((Row)results.get(1)).getField(0));
            tableEnv.executeSql("insert into db1.dest select max(dt) from db1.src").await();
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("2019-12-12"));
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testViews() throws Exception {
        TableEnvironment tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.src (key int,val string)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{1, "a"}).addRow(new Object[]{1, "aa"}).addRow(new Object[]{1, "aaa"}).addRow(new Object[]{2, "b"}).addRow(new Object[]{3, "c"}).addRow(new Object[]{3, "ccc"}).commit();
            tableEnv.executeSql("create table db1.keys (key int,name string)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "keys").addRow(new Object[]{1, "key1"}).addRow(new Object[]{2, "key2"}).addRow(new Object[]{3, "key3"}).addRow(new Object[]{4, "key4"}).commit();
            hiveShell.execute("create view db1.v1 as select key as k,val as v from db1.src limit 2");
            hiveShell.execute("create view db1.v2 as select key,count(*) from db1.src group by key having count(*)>1 order by key");
            hiveShell.execute("create view db1.v3 as select k.key,k.name,count(*) from db1.src s join db1.keys k on s.key=k.key group by k.key,k.name order by k.key");
            List results = CollectionUtil.iteratorToList((Iterator)tableEnv.sqlQuery("select count(v) from db1.v1").execute().collect());
            Assert.assertEquals((Object)"[+I[2]]", (Object)results.toString());
            results = CollectionUtil.iteratorToList((Iterator)tableEnv.sqlQuery("select * from db1.v2").execute().collect());
            Assert.assertEquals((Object)"[+I[1, 3], +I[3, 2]]", (Object)results.toString());
            results = CollectionUtil.iteratorToList((Iterator)tableEnv.sqlQuery("select * from db1.v3").execute().collect());
            Assert.assertEquals((Object)"[+I[1, key1, 3], +I[2, key2, 1], +I[3, key3, 2]]", (Object)results.toString());
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testWhitespacePartValue() throws Exception {
        TableEnvironment tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.dest (x int) partitioned by (p string)");
            StatementSet stmtSet = tableEnv.createStatementSet();
            stmtSet.addInsertSql("insert into db1.dest select 1,'  '");
            stmtSet.addInsertSql("insert into db1.dest select 2,'a \t'");
            stmtSet.execute().await();
            Assert.assertEquals((Object)"[p=  , p=a %09]", (Object)hiveShell.executeQuery("show partitions db1.dest").toString());
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testBatchTransactionalTable() {
        this.testTransactionalTable(true);
    }

    @Test
    public void testStreamTransactionalTable() {
        this.testTransactionalTable(false);
    }

    @Test
    public void testOrcSchemaEvol() throws Exception {
        Assume.assumeTrue((boolean)HiveVersionTestUtil.HIVE_210_OR_LATER);
        TableEnvironment tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.src (x smallint,y int) stored as orc");
            hiveShell.execute("insert into table db1.src values (1,100),(2,200)");
            tableEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
            tableEnv.executeSql("alter table db1.src change x x int");
            Assert.assertEquals((Object)"[+I[1, 100], +I[2, 200]]", (Object)CollectionUtil.iteratorToList((Iterator)tableEnv.sqlQuery("select * from db1.src").execute().collect()).toString());
            tableEnv.executeSql("alter table db1.src change y y string");
            Assert.assertEquals((Object)"[+I[1, 100], +I[2, 200]]", (Object)CollectionUtil.iteratorToList((Iterator)tableEnv.sqlQuery("select * from db1.src").execute().collect()).toString());
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testTransactionalTable(boolean batch) {
        TableEnvironment tableEnv = batch ? HiveRunnerITCase.getTableEnvWithHiveCatalog() : this.getStreamTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.src (x string,y string)");
            hiveShell.execute("create table db1.dest (x string,y string) clustered by (x) into 3 buckets stored as orc tblproperties ('transactional'='true')");
            ArrayList<Exception> exceptions = new ArrayList<Exception>();
            try {
                tableEnv.executeSql("insert into db1.src select * from db1.dest").await();
            }
            catch (Exception e2) {
                exceptions.add(e2);
            }
            try {
                tableEnv.executeSql("insert into db1.dest select * from db1.src").await();
            }
            catch (Exception e3) {
                exceptions.add(e3);
            }
            Assert.assertEquals((long)2L, (long)exceptions.size());
            exceptions.forEach(e -> {
                Assert.assertTrue((boolean)(e instanceof FlinkHiveException));
                Assert.assertEquals((Object)"Reading or writing ACID table db1.dest is not supported.", (Object)e.getMessage());
            });
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testCompressTextTable(boolean batch) throws Exception {
        TableEnvironment tableEnv = batch ? HiveRunnerITCase.getTableEnvWithHiveCatalog() : this.getStreamTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.executeSql("create table db1.src (x string,y string)");
            hiveShell.execute("create table db1.dest like db1.src");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{"a", "b"}).addRow(new Object[]{"c", "d"}).commit();
            hiveCatalog.getHiveConf().setBoolVar(HiveConf.ConfVars.COMPRESSRESULT, true);
            tableEnv.executeSql("insert into db1.dest select * from db1.src").await();
            List<String> expected = Arrays.asList("a\tb", "c\td");
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.dest", expected);
            HiveRunnerITCase.verifyFlinkQueryResult(tableEnv.sqlQuery("select * from db1.dest"), expected);
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    private static TableEnvironment getTableEnvWithHiveCatalog() {
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        tableEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        return tableEnv;
    }

    private TableEnvironment getStreamTableEnvWithHiveCatalog() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
        tableEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        return tableEnv;
    }

    private void readWriteFormat(String format) throws Exception {
        String tableSchema;
        TableEnvironment tableEnv = HiveRunnerITCase.getTableEnvWithHiveCatalog();
        tableEnv.executeSql("create database db1");
        String suffix = format.equals("csv") ? "row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'" : "stored as " + format;
        ArrayList<Object> row1 = new ArrayList<Object>(Arrays.asList(1, "a", "2018-08-20 00:00:00.1"));
        ArrayList<Object> row2 = new ArrayList<Object>(Arrays.asList(2, "b", "2019-08-26 00:00:00.1"));
        if (HiveVersionTestUtil.HIVE_120_OR_LATER || !format.equals("parquet")) {
            tableSchema = "(i int,s string,ts timestamp,dt date)";
            row1.add((Serializable)((Object)"2018-08-20"));
            row2.add((Serializable)((Object)"2019-08-26"));
        } else {
            tableSchema = "(i int,s string,ts timestamp)";
        }
        tableEnv.executeSql(String.format("create table db1.src %s partitioned by (p1 string, p2 timestamp) %s", tableSchema, suffix));
        tableEnv.executeSql(String.format("create table db1.dest %s partitioned by (p1 string, p2 timestamp) %s", tableSchema, suffix));
        hiveShell.execute(String.format("insert into table db1.src partition(p1='first',p2='2018-08-20 00:00:00.1') values (%s)", HiveRunnerITCase.toRowValue(row1)));
        hiveShell.execute(String.format("insert into table db1.src partition(p1='second',p2='2018-08-26 00:00:00.1') values (%s)", HiveRunnerITCase.toRowValue(row2)));
        List<String> expected = Arrays.asList(String.join((CharSequence)"\t", ArrayUtils.concat((String[])((String[])row1.stream().map(Object::toString).toArray(String[]::new)), (String[])new String[]{"first", "2018-08-20 00:00:00.1"})), String.join((CharSequence)"\t", ArrayUtils.concat((String[])((String[])row2.stream().map(Object::toString).toArray(String[]::new)), (String[])new String[]{"second", "2018-08-26 00:00:00.1"})));
        HiveRunnerITCase.verifyFlinkQueryResult(tableEnv.sqlQuery("select * from db1.src"), expected);
        if (!format.equals("orc") || !HiveShimLoader.getHiveVersion().startsWith("2.0")) {
            tableEnv.executeSql("insert into db1.dest select * from db1.src").await();
            HiveRunnerITCase.verifyHiveQueryResult("select * from db1.dest", expected);
        }
        tableEnv.executeSql("drop database db1 cascade");
    }

    private static void verifyWrittenData(List<Row> expected, List<String> results) throws Exception {
        Assert.assertEquals((long)expected.size(), (long)results.size());
        HashSet<String> expectedSet = new HashSet<String>();
        for (int i = 0; i < results.size(); ++i) {
            String rowString = expected.get(i).toString();
            expectedSet.add(rowString.substring(3, rowString.length() - 1).replaceAll(", ", "\t"));
        }
        Assert.assertEquals(expectedSet, new HashSet<String>(results));
    }

    private static List<Row> generateRecords(int numRecords) {
        int arity = 4;
        ArrayList<Row> res = new ArrayList<Row>(numRecords);
        for (int i = 0; i < numRecords; ++i) {
            Row row = new Row(arity);
            row.setField(0, (Object)i);
            row.setField(1, (Object)i);
            row.setField(2, (Object)Double.valueOf(String.valueOf(String.format("%d.%d", i, i))));
            row.setField(3, (Object)String.valueOf((char)(97 + i)));
            res.add(row);
        }
        return res;
    }

    private static void verifyHiveQueryResult(String query, List<String> expected) {
        List results = hiveShell.executeQuery(query);
        Assert.assertEquals((long)expected.size(), (long)results.size());
        Assert.assertEquals(new HashSet<String>(expected), new HashSet(results));
    }

    private static void verifyFlinkQueryResult(Table table, List<String> expected) throws Exception {
        List rows = CollectionUtil.iteratorToList((Iterator)table.execute().collect());
        List results = rows.stream().map(row -> IntStream.range(0, row.getArity()).mapToObj(arg_0 -> ((Row)row).getField(arg_0)).map(o -> o instanceof LocalDateTime ? Timestamp.valueOf((LocalDateTime)o) : o).map(Object::toString).collect(Collectors.joining("\t"))).collect(Collectors.toList());
        Assert.assertEquals((long)expected.size(), (long)results.size());
        Assert.assertEquals(new HashSet<String>(expected), new HashSet(results));
    }

    private static String toRowValue(List<Object> row) {
        return row.stream().map(o -> {
            String res = o.toString();
            if (o instanceof String) {
                res = "'" + res + "'";
            }
            return res;
        }).collect(Collectors.joining(","));
    }

    static {
        CONFIG = new HiveRunnerConfig(){
            {
                if (HiveShimLoader.getHiveVersion().startsWith("3.")) {
                    this.getHiveConfSystemOverride().put(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname, DbTxnManager.class.getName());
                    this.getHiveConfSystemOverride().put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
                    this.getHiveConfSystemOverride().put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
                }
            }
        };
    }
}

