/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.streaming;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.InvalidTable;
import org.apache.hive.streaming.RecordWriter;
import org.apache.hive.streaming.StrictDelimitedInputWriter;
import org.apache.hive.streaming.StrictJsonWriter;
import org.apache.hive.streaming.StrictRegexWriter;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestStreamingDynamicPartitioning {
    private static final Logger LOG = LoggerFactory.getLogger(TestStreamingDynamicPartitioning.class);
    private final HiveConf conf;
    private IDriver driver;
    private final IMetaStoreClient msClient;
    private static final String COL1 = "id";
    private static final String COL2 = "msg";
    @Rule
    public TemporaryFolder dbFolder = new TemporaryFolder();
    private static final String dbName = "testing";
    private static final String tblName = "alerts";
    private static final String[] fieldNames = new String[]{"id", "msg"};
    private static final String[] colTypes = new String[]{"int", "string"};
    private static final String[] partNames = new String[]{"Continent", "Country"};
    private static final String[] bucketCols = new String[]{"id"};
    private final String loc1;
    private static final String dbName2 = "testing2";

    public TestStreamingDynamicPartitioning() throws Exception {
        this.conf = new HiveConf(this.getClass());
        this.conf.set("fs.raw.impl", RawFileSystem.class.getName());
        this.conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
        TxnDbUtil.setConfValues((Configuration)this.conf);
        this.conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
        this.conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
        this.conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
        this.dbFolder.create();
        this.loc1 = this.dbFolder.newFolder("testing.db").toString();
        TxnDbUtil.cleanDb((Configuration)this.conf);
        TxnDbUtil.prepDb((Configuration)this.conf);
        this.msClient = new HiveMetaStoreClient((Configuration)this.conf);
    }

    @Before
    public void setup() throws Exception {
        SessionState.start((SessionState)new CliSessionState(this.conf));
        this.driver = DriverFactory.newDriver((HiveConf)this.conf);
        this.driver.setMaxRows(200002);
        TestStreamingDynamicPartitioning.dropDB(this.msClient, dbName);
        TestStreamingDynamicPartitioning.createDbAndTable(this.driver, dbName, tblName, null, fieldNames, colTypes, bucketCols, partNames, this.loc1, 1);
        TestStreamingDynamicPartitioning.dropDB(this.msClient, dbName2);
        String loc2 = this.dbFolder.newFolder("testing2.db").toString();
        String loc3 = this.dbFolder.newFolder("testing5.db").toString();
        this.createStoreSales("testing5", loc3);
        TestStreamingDynamicPartitioning.runDDL(this.driver, "drop table testBucketing3.streamedtable");
        TestStreamingDynamicPartitioning.runDDL(this.driver, "drop table testBucketing3.finaltable");
        TestStreamingDynamicPartitioning.runDDL(this.driver, "drop table testBucketing3.nobucket");
    }

    @After
    public void cleanup() throws Exception {
        this.msClient.close();
        this.driver.close();
    }

    private void createStoreSales(String dbName, String loc) throws Exception {
        String dbUri = "raw://" + new Path(loc).toUri().toString();
        String tableLoc = dbUri + "/" + "store_sales";
        boolean success = TestStreamingDynamicPartitioning.runDDL(this.driver, "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'");
        Assert.assertTrue((boolean)success);
        success = TestStreamingDynamicPartitioning.runDDL(this.driver, "use " + dbName);
        Assert.assertTrue((boolean)success);
        success = TestStreamingDynamicPartitioning.runDDL(this.driver, "drop table if exists store_sales");
        Assert.assertTrue((boolean)success);
        success = TestStreamingDynamicPartitioning.runDDL(this.driver, "create table store_sales\n(\n    ss_sold_date_sk           int,\n    ss_sold_time_sk           int,\n    ss_item_sk                int,\n    ss_customer_sk            int,\n    ss_cdemo_sk               int,\n    ss_hdemo_sk               int,\n    ss_addr_sk                int,\n    ss_store_sk               int,\n    ss_promo_sk               int,\n    ss_ticket_number          int,\n    ss_quantity               int,\n    ss_wholesale_cost         decimal(7,2),\n    ss_list_price             decimal(7,2),\n    ss_sales_price            decimal(7,2),\n    ss_ext_discount_amt       decimal(7,2),\n    ss_ext_sales_price        decimal(7,2),\n    ss_ext_wholesale_cost     decimal(7,2),\n    ss_ext_list_price         decimal(7,2),\n    ss_ext_tax                decimal(7,2),\n    ss_coupon_amt             decimal(7,2),\n    ss_net_paid               decimal(7,2),\n    ss_net_paid_inc_tax       decimal(7,2),\n    ss_net_profit             decimal(7,2)\n)\n partitioned by (dt string)\nclustered by (ss_store_sk, ss_promo_sk)\nINTO 4 BUCKETS stored as orc  location '" + tableLoc + "'  TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
        Assert.assertTrue((boolean)success);
        success = TestStreamingDynamicPartitioning.runDDL(this.driver, "alter table store_sales add partition(dt='2015')");
        Assert.assertTrue((boolean)success);
    }

    @Test
    public void testDynamicPartitioning() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase("testing5").withTable("store_sales").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(this.conf).connect();
        connection.beginTransaction();
        for (int i = 0; i < 10; ++i) {
            StringBuilder row = new StringBuilder();
            for (int ints = 0; ints < 11; ++ints) {
                row.append(ints).append(',');
            }
            for (int decs = 0; decs < 12; ++decs) {
                row.append((double)i + 0.1).append(',');
            }
            row.append("2018-04-").append(i);
            connection.write(row.toString().getBytes());
        }
        connection.commitTransaction();
        connection.close();
        ArrayList<String> partitions = TestStreamingDynamicPartitioning.queryTable(this.driver, "show partitions testing5.store_sales");
        Assert.assertEquals((long)11L, (long)partitions.size());
        for (int i = 1; i < partitions.size(); ++i) {
            Assert.assertEquals((Object)("dt=2018-04-" + (i - 1)), partitions.get(i));
        }
        ArrayList<String> res = TestStreamingDynamicPartitioning.queryTable(this.driver, "select * from testing5.store_sales");
        for (String re : res) {
            System.out.println(re);
            Assert.assertEquals((Object)true, (Object)re.contains("2018-04-"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDPStreamBucketingMatchesRegularBucketing() throws Exception {
        int bucketCount = 100;
        String dbUri = "raw://" + new Path(this.dbFolder.newFolder().toString()).toUri().toString();
        String tableLoc = "'" + dbUri + "/" + "streamedtable'";
        String tableLoc2 = "'" + dbUri + "/" + "finaltable'";
        String tableLoc3 = "'" + dbUri + "/" + "nobucket'";
        this.conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
        try (IDriver driver = DriverFactory.newDriver((HiveConf)this.conf);){
            TestStreamingDynamicPartitioning.runDDL(driver, "create database testBucketing3");
            TestStreamingDynamicPartitioning.runDDL(driver, "use testBucketing3");
            TestStreamingDynamicPartitioning.runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) partitioned by (year int) clustered by ( key1,key2 ) into " + bucketCount + " buckets  stored as orc  location " + tableLoc + " TBLPROPERTIES ('transactional'='true')");
            TestStreamingDynamicPartitioning.runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) partitioned by (year int) location " + tableLoc3);
            TestStreamingDynamicPartitioning.runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) partitioned by (year int) clustered by ( key1,key2 ) into " + bucketCount + " buckets  stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')");
            String[] records = new String[]{"PSFAHYLZVC,29,EPNMA,2017", "PPPRKWAYAU,96,VUTEE,2017", "MIAOFERCHI,3,WBDSI,2017", "CEGQAZOWVN,0,WCUZL,2017", "XWAKMNSVQF,28,YJVHU,2017", "XBWTSAJWME,2,KDQFO,2017", "FUVLQTAXAY,5,LDSDG,2017", "QTQMDJMGJH,6,QBOMA,2018", "EFLOTLWJWN,71,GHWPS,2018", "PEQNAOJHCM,82,CAAFI,2018", "MOEKQLGZCP,41,RUACR,2018", "QZXMCOPTID,37,LFLWE,2018", "EYALVWICRD,13,JEZLC,2018", "VYWLZAYTXX,16,DMVZX,2018", "OSALYSQIXR,47,HNZVE,2018", "JGKVHKCEGQ,25,KSCJB,2018", "WQFMMYDHET,12,DTRWA,2018", "AJOVAYZKZQ,15,YBKFO,2018", "YAQONWCUAU,31,QJNHZ,2018", "DJBXUEUOEB,35,IYCBL,2018"};
            StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
            HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase("testBucketing3").withTable("streamedtable").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)wr).withHiveConf(this.conf).connect();
            connection.beginTransaction();
            for (String record : records) {
                connection.write(record.getBytes());
            }
            connection.commitTransaction();
            connection.close();
            ArrayList<String> res1 = TestStreamingDynamicPartitioning.queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2");
            for (String re : res1) {
                System.out.println(re);
                Assert.assertTrue((re.endsWith("2017") || re.endsWith("2018") ? 1 : 0) != 0);
            }
            driver.run("insert into nobucket partition(year) select row__id.bucketid,* from streamedtable");
            ArrayList<String> res = TestStreamingDynamicPartitioning.queryTable(driver, "select * from nobucket");
            Assert.assertEquals((long)records.length, (long)res.size());
            TestStreamingDynamicPartitioning.runDDL(driver, " insert into finaltable partition(year) select * from nobucket");
            res = TestStreamingDynamicPartitioning.queryTable(driver, "select * from finaltable");
            Assert.assertEquals((long)records.length, (long)res.size());
            ArrayList<String> res2 = TestStreamingDynamicPartitioning.queryTable(driver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid");
            for (String s : res2) {
                LOG.error(s);
            }
            Assert.assertTrue((boolean)res2.isEmpty());
            res2 = TestStreamingDynamicPartitioning.queryTable(driver, "select * from finaltable where year=2018");
            Assert.assertEquals((long)13L, (long)res2.size());
            for (String s : res2) {
                Assert.assertTrue((boolean)s.endsWith("2018"));
            }
            res2 = TestStreamingDynamicPartitioning.queryTable(driver, "show partitions finaltable");
            Assert.assertEquals((long)2L, (long)res2.size());
            Assert.assertEquals((Object)"year=2017", (Object)res2.get(0));
            Assert.assertEquals((Object)"year=2018", (Object)res2.get(1));
        }
        finally {
            this.conf.unset(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname);
        }
    }

    @Test
    public void testDPTwoLevel() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tblName).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(this.conf).connect();
        connection.beginTransaction();
        connection.write("1,foo,Asia,India".getBytes());
        connection.write("2,bar,Europe,Germany".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("3,foo,Asia,India".getBytes());
        connection.write("4,bar,Europe,Germany".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("5,foo,Asia,China".getBytes());
        connection.write("6,bar,Europe,France".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("7,foo,Asia,China".getBytes());
        connection.write("8,bar,Europe,France".getBytes());
        connection.commitTransaction();
        connection.close();
        ArrayList<String> res = TestStreamingDynamicPartitioning.queryTable(this.driver, "select * from testing.alerts order by id");
        Assert.assertEquals((long)8L, (long)res.size());
        Assert.assertEquals((Object)"1\tfoo\tAsia\tIndia", res.get(0));
        Assert.assertEquals((Object)"2\tbar\tEurope\tGermany", res.get(1));
        Assert.assertEquals((Object)"3\tfoo\tAsia\tIndia", res.get(2));
        Assert.assertEquals((Object)"4\tbar\tEurope\tGermany", res.get(3));
        Assert.assertEquals((Object)"5\tfoo\tAsia\tChina", res.get(4));
        Assert.assertEquals((Object)"6\tbar\tEurope\tFrance", res.get(5));
        Assert.assertEquals((Object)"7\tfoo\tAsia\tChina", res.get(6));
        Assert.assertEquals((Object)"8\tbar\tEurope\tFrance", res.get(7));
        res = TestStreamingDynamicPartitioning.queryTable(this.driver, "show partitions testing.alerts");
        Assert.assertEquals((long)4L, (long)res.size());
        Assert.assertTrue((boolean)res.contains("continent=Asia/country=India"));
        Assert.assertTrue((boolean)res.contains("continent=Asia/country=China"));
        Assert.assertTrue((boolean)res.contains("continent=Europe/country=Germany"));
        Assert.assertTrue((boolean)res.contains("continent=Europe/country=France"));
    }

    @Test
    public void testDPTwoLevelMissingPartitionValues() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tblName).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(this.conf).connect();
        connection.beginTransaction();
        connection.write("1,foo,Asia,India".getBytes());
        connection.write("2,bar,Europe,Germany".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("3,foo,Asia,India".getBytes());
        connection.write("4,bar,Europe,Germany".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("5,foo,Asia,China".getBytes());
        connection.write("6,bar,Europe,France".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("7,foo,Asia,China".getBytes());
        connection.write("8,bar,Europe,France".getBytes());
        connection.commitTransaction();
        connection.close();
        ArrayList<String> res = TestStreamingDynamicPartitioning.queryTable(this.driver, "select * from testing.alerts order by id");
        Assert.assertEquals((long)8L, (long)res.size());
        Assert.assertEquals((Object)"1\tfoo\tAsia\tIndia", res.get(0));
        Assert.assertEquals((Object)"2\tbar\tEurope\tGermany", res.get(1));
        Assert.assertEquals((Object)"3\tfoo\tAsia\tIndia", res.get(2));
        Assert.assertEquals((Object)"4\tbar\tEurope\tGermany", res.get(3));
        Assert.assertEquals((Object)"5\tfoo\tAsia\tChina", res.get(4));
        Assert.assertEquals((Object)"6\tbar\tEurope\tFrance", res.get(5));
        Assert.assertEquals((Object)"7\tfoo\tAsia\tChina", res.get(6));
        Assert.assertEquals((Object)"8\tbar\tEurope\tFrance", res.get(7));
        res = TestStreamingDynamicPartitioning.queryTable(this.driver, "show partitions testing.alerts");
        Assert.assertEquals((long)4L, (long)res.size());
        Assert.assertTrue((boolean)res.contains("continent=Asia/country=India"));
        Assert.assertTrue((boolean)res.contains("continent=Asia/country=China"));
        Assert.assertTrue((boolean)res.contains("continent=Europe/country=Germany"));
        Assert.assertTrue((boolean)res.contains("continent=Europe/country=France"));
    }

    @Test
    public void testDPTwoLevelNonStringPartitionColumns() throws Exception {
        String tblName = "alerts2";
        String[] partNames = new String[]{"year", "month"};
        TestStreamingDynamicPartitioning.createDbAndTable(this.driver, dbName, tblName, null, fieldNames, colTypes, bucketCols, partNames, this.loc1, 2, "partitioned by (year int, month int)");
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tblName).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(this.conf).connect();
        connection.beginTransaction();
        connection.write("1,foo,2018,2".getBytes());
        connection.write("2,bar,2019".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("3,foo,2018".getBytes());
        connection.write("4,bar,2019".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("5,foo,2018".getBytes());
        connection.write("6,bar,2019".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("7,foo,,".getBytes());
        connection.write("8,bar,,12".getBytes());
        connection.commitTransaction();
        connection.close();
        String defaultPartitionName = "NULL";
        ArrayList<String> res = TestStreamingDynamicPartitioning.queryTable(this.driver, "select * from " + "testing." + tblName + " order by id");
        Assert.assertEquals((long)8L, (long)res.size());
        Assert.assertEquals((Object)"1\tfoo\t2018\t2", res.get(0));
        Assert.assertEquals((Object)("2\tbar\t2019\t" + defaultPartitionName), res.get(1));
        Assert.assertEquals((Object)("3\tfoo\t2018\t" + defaultPartitionName), res.get(2));
        Assert.assertEquals((Object)("4\tbar\t2019\t" + defaultPartitionName), res.get(3));
        Assert.assertEquals((Object)("5\tfoo\t2018\t" + defaultPartitionName), res.get(4));
        Assert.assertEquals((Object)("6\tbar\t2019\t" + defaultPartitionName), res.get(5));
        Assert.assertEquals((Object)("7\tfoo\t" + defaultPartitionName + "\t" + defaultPartitionName), res.get(6));
        Assert.assertEquals((Object)("8\tbar\t" + defaultPartitionName + "\t12"), res.get(7));
        defaultPartitionName = this.conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
        res = TestStreamingDynamicPartitioning.queryTable(this.driver, "show partitions " + "testing." + tblName);
        Assert.assertEquals((long)5L, (long)res.size());
        Assert.assertTrue((boolean)res.contains("year=2018/month=2"));
        Assert.assertTrue((boolean)res.contains("year=2018/month=" + defaultPartitionName));
        Assert.assertTrue((boolean)res.contains("year=2019/month=" + defaultPartitionName));
        Assert.assertTrue((boolean)res.contains("year=" + defaultPartitionName + "/month=" + defaultPartitionName));
        Assert.assertTrue((boolean)res.contains("year=" + defaultPartitionName + "/month=12"));
    }

    @Test
    public void testWriteBeforeBegin() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tblName).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(this.conf).connect();
        connection.beginTransaction();
        connection.write("1,foo,Asia".getBytes());
        connection.write("2,bar,Europe".getBytes());
        connection.commitTransaction();
        Exception exception = null;
        try {
            connection.write("3,SHOULD FAIL!".getBytes());
        }
        catch (Exception e) {
            exception = e;
        }
        Assert.assertNotNull((Object)exception);
        Assert.assertTrue((boolean)exception.getMessage().equals("Transaction state is not OPEN. Missing beginTransaction?"));
        exception = null;
        try {
            connection.commitTransaction();
        }
        catch (Exception e) {
            exception = e;
        }
        Assert.assertNotNull((Object)exception);
        Assert.assertTrue((boolean)exception.getMessage().equals("Transaction state is not OPEN. Missing beginTransaction?"));
        exception = null;
        try {
            connection.abortTransaction();
        }
        catch (Exception e) {
            exception = e;
        }
        Assert.assertNotNull((Object)exception);
        Assert.assertTrue((boolean)exception.getMessage().equals("Transaction state is not OPEN. Missing beginTransaction?"));
        connection.close();
        String defaultPartitionName = this.conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
        ArrayList<String> res = TestStreamingDynamicPartitioning.queryTable(this.driver, "select * from testing.alerts order by id");
        Assert.assertEquals((long)2L, (long)res.size());
        Assert.assertEquals((Object)("1\tfoo\tAsia\t" + defaultPartitionName), res.get(0));
        Assert.assertEquals((Object)("2\tbar\tEurope\t" + defaultPartitionName), res.get(1));
    }

    @Test
    public void testRegexInputStreamDP() throws Exception {
        String regex = "([^,]*),(.*),(.*),(.*)";
        StrictRegexWriter writer = StrictRegexWriter.newBuilder().withRegex(regex).build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tblName).withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(this.conf).withRecordWriter((RecordWriter)writer).connect();
        String rows = "1,foo,Asia,India\r2,bar,Europe,Germany\r3,baz,Asia,China\r4,cat,Australia,";
        ByteArrayInputStream bais = new ByteArrayInputStream(rows.getBytes());
        connection.beginTransaction();
        connection.write((InputStream)bais);
        connection.commitTransaction();
        bais.close();
        connection.close();
        ArrayList<String> rs = TestStreamingDynamicPartitioning.queryTable(this.driver, "select * from testing.alerts order by id");
        Assert.assertEquals((long)4L, (long)rs.size());
        Assert.assertEquals((Object)"1\tfoo\tAsia\tIndia", rs.get(0));
        Assert.assertEquals((Object)"2\tbar\tEurope\tGermany", rs.get(1));
        Assert.assertEquals((Object)"3\tbaz\tAsia\tChina", rs.get(2));
        Assert.assertEquals((Object)"4\tcat\tAustralia\t__HIVE_DEFAULT_PARTITION__", rs.get(3));
        rs = TestStreamingDynamicPartitioning.queryTable(this.driver, "show partitions testing.alerts");
        Assert.assertEquals((long)4L, (long)rs.size());
        Assert.assertTrue((boolean)rs.contains("continent=Asia/country=India"));
        Assert.assertTrue((boolean)rs.contains("continent=Asia/country=China"));
        Assert.assertTrue((boolean)rs.contains("continent=Europe/country=Germany"));
        Assert.assertTrue((boolean)rs.contains("continent=Australia/country=__HIVE_DEFAULT_PARTITION__"));
    }

    @Test
    public void testJsonInputStreamDP() throws Exception {
        StrictJsonWriter writer = StrictJsonWriter.newBuilder().withLineDelimiterPattern("\\|").build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tblName).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(this.conf).connect();
        connection.beginTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.OPEN, (Object)connection.getCurrentTransactionState());
        String records = "{\"id\" : 1, \"msg\": \"Hello streaming\", \"continent\": \"Asia\", \"Country\": \"India\"}|{\"id\" : 2, \"msg\": \"Hello world\", \"continent\": \"Europe\", \"Country\": \"Germany\"}|{\"id\" : 3, \"msg\": \"Hello world!!\", \"continent\": \"Asia\", \"Country\": \"China\"}|{\"id\" : 4, \"msg\": \"Hmm..\", \"continent\": \"Australia\", \"Unknown-field\": \"whatever\"}|";
        ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes());
        connection.write((InputStream)bais);
        connection.commitTransaction();
        bais.close();
        connection.close();
        ArrayList<String> rs = TestStreamingDynamicPartitioning.queryTable(this.driver, "select * from testing.alerts order by id");
        Assert.assertEquals((long)4L, (long)rs.size());
        Assert.assertEquals((Object)"1\tHello streaming\tAsia\tIndia", rs.get(0));
        Assert.assertEquals((Object)"2\tHello world\tEurope\tGermany", rs.get(1));
        Assert.assertEquals((Object)"3\tHello world!!\tAsia\tChina", rs.get(2));
        Assert.assertEquals((Object)"4\tHmm..\tAustralia\t__HIVE_DEFAULT_PARTITION__", rs.get(3));
        rs = TestStreamingDynamicPartitioning.queryTable(this.driver, "show partitions testing.alerts");
        Assert.assertEquals((long)4L, (long)rs.size());
        Assert.assertTrue((boolean)rs.contains("continent=Asia/country=India"));
        Assert.assertTrue((boolean)rs.contains("continent=Asia/country=China"));
        Assert.assertTrue((boolean)rs.contains("continent=Europe/country=Germany"));
        Assert.assertTrue((boolean)rs.contains("continent=Australia/country=__HIVE_DEFAULT_PARTITION__"));
    }

    @Test
    public void testWriteAfterClose() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tblName).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(this.conf).connect();
        connection.beginTransaction();
        connection.write("1,foo,Asia".getBytes());
        connection.write("2,bar,Europe".getBytes());
        connection.commitTransaction();
        connection.close();
        Exception exception = null;
        try {
            connection.write("3,SHOULD FAIL!".getBytes());
        }
        catch (Exception e) {
            exception = e;
        }
        Assert.assertNotNull((Object)exception);
        Assert.assertTrue((boolean)exception.getMessage().endsWith("Streaming connection is closed already."));
        exception = null;
        try {
            connection.commitTransaction();
        }
        catch (Exception e) {
            exception = e;
        }
        Assert.assertNotNull((Object)exception);
        Assert.assertTrue((boolean)exception.getMessage().endsWith("Streaming connection is closed already."));
        exception = null;
        try {
            connection.abortTransaction();
        }
        catch (Exception e) {
            exception = e;
        }
        Assert.assertNotNull((Object)exception);
        Assert.assertTrue((boolean)exception.getMessage().endsWith("Streaming connection is closed already."));
        String defaultPartitionName = this.conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
        ArrayList<String> res = TestStreamingDynamicPartitioning.queryTable(this.driver, "select * from testing.alerts order by id");
        Assert.assertEquals((long)2L, (long)res.size());
        Assert.assertEquals((Object)("1\tfoo\tAsia\t" + defaultPartitionName), res.get(0));
        Assert.assertEquals((Object)("2\tbar\tEurope\t" + defaultPartitionName), res.get(1));
    }

    @Test
    public void testWriteAfterAbort() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tblName).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(this.conf).connect();
        connection.beginTransaction();
        connection.write("1,foo,Asia".getBytes());
        connection.write("2,bar,Europe".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("3,oops!".getBytes());
        connection.abortTransaction();
        connection.beginTransaction();
        connection.write("4,I did it again!".getBytes());
        connection.abortTransaction();
        connection.beginTransaction();
        connection.write("5,Not now!,Europe".getBytes());
        connection.commitTransaction();
        connection.close();
        Exception exception = null;
        try {
            connection.write("6,SHOULD FAIL!".getBytes());
        }
        catch (Exception e) {
            exception = e;
        }
        Assert.assertNotNull((Object)exception);
        Assert.assertTrue((boolean)exception.getMessage().equals("Streaming connection is closed already."));
        String defaultPartitionName = this.conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
        ArrayList<String> res = TestStreamingDynamicPartitioning.queryTable(this.driver, "select * from testing.alerts order by id");
        Assert.assertEquals((long)3L, (long)res.size());
        Assert.assertEquals((Object)("1\tfoo\tAsia\t" + defaultPartitionName), res.get(0));
        Assert.assertEquals((Object)("2\tbar\tEurope\t" + defaultPartitionName), res.get(1));
        Assert.assertEquals((Object)("5\tNot now!\tEurope\t" + defaultPartitionName), res.get(2));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTableValidation() throws Exception {
        int bucketCount = 100;
        String dbUri = "raw://" + new Path(this.dbFolder.newFolder().toString()).toUri().toString();
        String tbl1 = "validation1";
        String tbl2 = "validation2";
        String tableLoc = "'" + dbUri + "/" + tbl1 + "'";
        String tableLoc2 = "'" + dbUri + "/" + tbl2 + "'";
        TestStreamingDynamicPartitioning.runDDL(this.driver, "create database testBucketing3");
        TestStreamingDynamicPartitioning.runDDL(this.driver, "use testBucketing3");
        TestStreamingDynamicPartitioning.runDDL(this.driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into " + bucketCount + " buckets  stored as orc  location " + tableLoc + " TBLPROPERTIES ('transactional'='false')");
        TestStreamingDynamicPartitioning.runDDL(this.driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into " + bucketCount + " buckets  stored as orc  location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')");
        StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        try (HiveStreamingConnection connection = null;){
            connection = HiveStreamingConnection.newBuilder().withDatabase("testBucketing3").withTable("validation2").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)wr).withHiveConf(this.conf).connect();
            Assert.assertTrue((String)"InvalidTable exception was not thrown", (boolean)false);
        }
        try {
            connection = HiveStreamingConnection.newBuilder().withDatabase("testBucketing3").withTable("validation2").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)wr).withHiveConf(this.conf).connect();
            Assert.assertTrue((String)"InvalidTable exception was not thrown", (boolean)false);
        }
        catch (InvalidTable invalidTable) {
        }
        finally {
            if (connection != null) {
                connection.close();
            }
        }
    }

    private static boolean runDDL(IDriver driver, String sql) {
        LOG.debug(sql);
        System.out.println(sql);
        CommandProcessorResponse cpr = driver.run(sql);
        if (cpr.getResponseCode() == 0) {
            return true;
        }
        LOG.error("Statement: " + sql + " failed: " + cpr);
        return false;
    }

    private static ArrayList<String> queryTable(IDriver driver, String query) throws IOException {
        CommandProcessorResponse cpr = driver.run(query);
        if (cpr.getResponseCode() != 0) {
            throw new RuntimeException(query + " failed: " + cpr);
        }
        ArrayList<String> res = new ArrayList<String>();
        driver.getResults(res);
        return res;
    }

    public static void dropDB(IMetaStoreClient client, String databaseName) {
        try {
            for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
                client.dropTable(databaseName, table, true, true);
            }
            client.dropDatabase(databaseName);
        }
        catch (TException tException) {
            // empty catch block
        }
    }

    private static Path createDbAndTable(IDriver driver, String databaseName, String tableName, List<String> partVals, String[] colNames, String[] colTypes, String[] bucketCols, String[] partNames, String dbLocation, int bucketCount) throws Exception {
        String dbUri = "raw://" + new Path(dbLocation).toUri().toString();
        String tableLoc = dbUri + "/" + tableName;
        TestStreamingDynamicPartitioning.runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri + "'");
        TestStreamingDynamicPartitioning.runDDL(driver, "use " + databaseName);
        String crtTbl = "create table " + tableName + " ( " + TestStreamingDynamicPartitioning.getTableColumnsStr(colNames, colTypes) + " )" + TestStreamingDynamicPartitioning.getPartitionStmtStr(partNames) + " clustered by ( " + TestStreamingDynamicPartitioning.join(bucketCols, ",") + " ) into " + bucketCount + " buckets  stored as orc  location '" + tableLoc + "' TBLPROPERTIES ('transactional'='true') ";
        TestStreamingDynamicPartitioning.runDDL(driver, crtTbl);
        if (partNames != null && partNames.length != 0 && partVals != null) {
            return TestStreamingDynamicPartitioning.addPartition(driver, tableName, partVals, partNames);
        }
        return new Path(tableLoc);
    }

    private static Path createDbAndTable(IDriver driver, String databaseName, String tableName, List<String> partVals, String[] colNames, String[] colTypes, String[] bucketCols, String[] partNames, String dbLocation, int bucketCount, String partLine) throws Exception {
        String dbUri = "raw://" + new Path(dbLocation).toUri().toString();
        String tableLoc = dbUri + "/" + tableName;
        TestStreamingDynamicPartitioning.runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri + "'");
        TestStreamingDynamicPartitioning.runDDL(driver, "use " + databaseName);
        String crtTbl = "create table " + tableName + " ( " + TestStreamingDynamicPartitioning.getTableColumnsStr(colNames, colTypes) + " )" + partLine + " clustered by ( " + TestStreamingDynamicPartitioning.join(bucketCols, ",") + " ) into " + bucketCount + " buckets  stored as orc  location '" + tableLoc + "' TBLPROPERTIES ('transactional'='true') ";
        TestStreamingDynamicPartitioning.runDDL(driver, crtTbl);
        if (partNames != null && partNames.length != 0 && partVals != null) {
            return TestStreamingDynamicPartitioning.addPartition(driver, tableName, partVals, partNames);
        }
        return new Path(tableLoc);
    }

    private static Path addPartition(IDriver driver, String tableName, List<String> partVals, String[] partNames) throws Exception {
        String partSpec = TestStreamingDynamicPartitioning.getPartsSpec(partNames, partVals);
        String addPart = "alter table " + tableName + " add partition ( " + partSpec + " )";
        TestStreamingDynamicPartitioning.runDDL(driver, addPart);
        return TestStreamingDynamicPartitioning.getPartitionPath(driver, tableName, partSpec);
    }

    private static Path getPartitionPath(IDriver driver, String tableName, String partSpec) throws Exception {
        ArrayList<String> res = TestStreamingDynamicPartitioning.queryTable(driver, "describe extended " + tableName + " PARTITION (" + partSpec + ")");
        String partInfo = res.get(res.size() - 1);
        int start = partInfo.indexOf("location:") + "location:".length();
        int end = partInfo.indexOf(",", start);
        return new Path(partInfo.substring(start, end));
    }

    private static String getTableColumnsStr(String[] colNames, String[] colTypes) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < colNames.length; ++i) {
            sb.append(colNames[i]).append(" ").append(colTypes[i]);
            if (i >= colNames.length - 1) continue;
            sb.append(",");
        }
        return sb.toString();
    }

    private static String getTablePartsStr(String[] partNames) {
        if (partNames == null || partNames.length == 0) {
            return "";
        }
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < partNames.length; ++i) {
            sb.append(partNames[i]).append(" string");
            if (i >= partNames.length - 1) continue;
            sb.append(",");
        }
        return sb.toString();
    }

    private static String getPartsSpec(String[] partNames, List<String> partVals) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < partVals.size(); ++i) {
            sb.append(partNames[i]).append(" = '").append(partVals.get(i)).append("'");
            if (i >= partVals.size() - 1) continue;
            sb.append(",");
        }
        return sb.toString();
    }

    private static String join(String[] values, String delimiter) {
        if (values == null) {
            return null;
        }
        StringBuilder strbuf = new StringBuilder();
        boolean first = true;
        for (String value : values) {
            if (!first) {
                strbuf.append(delimiter);
            } else {
                first = false;
            }
            strbuf.append(value.toString());
        }
        return strbuf.toString();
    }

    private static String getPartitionStmtStr(String[] partNames) {
        if (partNames == null || partNames.length == 0) {
            return "";
        }
        return " partitioned by (" + TestStreamingDynamicPartitioning.getTablePartsStr(partNames) + " )";
    }

    public static class RawFileSystem
    extends RawLocalFileSystem {
        private static final URI NAME;

        public URI getUri() {
            return NAME;
        }

        public String getScheme() {
            return "raw";
        }

        public FileStatus getFileStatus(Path path) throws IOException {
            File file = this.pathToFile(path);
            if (!file.exists()) {
                throw new FileNotFoundException("Cannot find " + path);
            }
            short mod = 0;
            if (file.canRead()) {
                mod = (short)(mod | 0x124);
            }
            if (file.canWrite()) {
                mod = (short)(mod | 0x80);
            }
            if (file.canExecute()) {
                mod = (short)(mod | 0x49);
            }
            return new FileStatus(file.length(), file.isDirectory(), 1, 1024L, file.lastModified(), file.lastModified(), FsPermission.createImmutable((short)mod), "owen", "users", path);
        }

        static {
            try {
                NAME = new URI("raw:///");
            }
            catch (URISyntaxException se) {
                throw new IllegalArgumentException("bad uri", se);
            }
        }
    }
}

