package org.apache.hive.streaming;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.orc.impl.OrcAcidUtils;
import org.apache.orc.tools.FileDump;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hive/streaming/TestStreaming.class */
public class TestStreaming {
    private IDriver driver;
    private final IMetaStoreClient msClient;
    private static final String dbName = "testing";
    private static final String tblName = "alerts";
    static List<String> partitionVals;
    private static Path partLoc;
    private static Path partLoc2;
    private static final String dbName2 = "testing2";
    private static final String tblName2 = "alerts";
    private static final String dbName3 = "testing3";
    private static final String tblName3 = "dimensionTable";
    private static final String dbName4 = "testing4";
    private static final String tblName4 = "factTable";
    List<String> partitionVals2;
    private final String PART1_CONTINENT = "Asia";
    private final String PART1_COUNTRY = "India";

    @Rule
    public TemporaryFolder dbFolder = new TemporaryFolder();
    private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
    private static HiveConf conf = null;
    private static final String COL1 = "id";
    private static final String COL2 = "msg";
    private static final String[] fieldNames = {COL1, COL2};
    private static final String[] fieldNames2 = {COL1, COL2};

    /* loaded from: input_file:org/apache/hive/streaming/TestStreaming$FaultyWriter.class */
    private static final class FaultyWriter implements RecordWriter {
        private final RecordWriter delegate;
        private boolean shouldThrow;
        static final /* synthetic */ boolean $assertionsDisabled;

        private FaultyWriter(RecordWriter recordWriter) {
            this.shouldThrow = false;
            if (!$assertionsDisabled && recordWriter == null) {
                throw new AssertionError();
            }
            this.delegate = recordWriter;
        }

        public void init(StreamingConnection streamingConnection, long j, long j2) throws StreamingException {
            this.delegate.init(streamingConnection, j, j2);
        }

        public void write(long j, byte[] bArr) throws StreamingException {
            this.delegate.write(j, bArr);
            produceFault();
        }

        public void write(long j, InputStream inputStream) throws StreamingException {
            this.delegate.write(j, inputStream);
            produceFault();
        }

        public void flush() throws StreamingException {
            this.delegate.flush();
            produceFault();
        }

        public void close() throws StreamingException {
            this.delegate.close();
        }

        public Set<String> getPartitions() {
            return this.delegate.getPartitions();
        }

        private void produceFault() throws StreamingIOFailure {
            if (this.shouldThrow) {
                throw new StreamingIOFailure("Simulated fault occurred");
            }
        }

        void enableErrors() {
            this.shouldThrow = true;
        }

        void disableErrors() {
            this.shouldThrow = false;
        }

        static {
            $assertionsDisabled = !TestStreaming.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/hive/streaming/TestStreaming$RawFileSystem.class */
    public static class RawFileSystem extends RawLocalFileSystem {
        private static final URI NAME;

        public URI getUri() {
            return NAME;
        }

        public String getScheme() {
            return "raw";
        }

        public FileStatus getFileStatus(Path path) throws IOException {
            File pathToFile = pathToFile(path);
            if (!pathToFile.exists()) {
                throw new FileNotFoundException("Can'table find " + path);
            }
            short s = 0;
            if (pathToFile.canRead()) {
                s = (short) (0 | 292);
            }
            if (pathToFile.canWrite()) {
                s = (short) (s | 128);
            }
            if (pathToFile.canExecute()) {
                s = (short) (s | 73);
            }
            return new FileStatus(pathToFile.length(), pathToFile.isDirectory(), 1, 1024L, pathToFile.lastModified(), pathToFile.lastModified(), FsPermission.createImmutable(s), "owen", "users", path);
        }

        static {
            try {
                NAME = new URI("raw:///");
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException("bad uri", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hive/streaming/TestStreaming$SampleRec.class */
    public static class SampleRec {
        public String field1;
        public int field2;
        public String field3;

        public SampleRec(String str, int i, String str2) {
            this.field1 = str;
            this.field2 = i;
            this.field3 = str2;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            SampleRec sampleRec = (SampleRec) obj;
            if (this.field2 != sampleRec.field2) {
                return false;
            }
            if (this.field1 != null) {
                if (!this.field1.equals(sampleRec.field1)) {
                    return false;
                }
            } else if (sampleRec.field1 != null) {
                return false;
            }
            return this.field3 == null ? sampleRec.field3 == null : this.field3.equals(sampleRec.field3);
        }

        public int hashCode() {
            return (31 * ((31 * (this.field1 != null ? this.field1.hashCode() : 0)) + this.field2)) + (this.field3 != null ? this.field3.hashCode() : 0);
        }

        public String toString() {
            return " { '" + this.field1 + "'," + this.field2 + ",'" + this.field3 + "' }";
        }
    }

    /* loaded from: input_file:org/apache/hive/streaming/TestStreaming$WriterThd.class */
    private static class WriterThd extends Thread {
        private final StreamingConnection conn;
        private final String data;
        private Throwable error;

        WriterThd(String str) throws Exception {
            super("Writer_" + str);
            HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(TestStreaming.dbName).withTable("alerts").withStaticPartitionValues(TestStreaming.partitionVals).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(TestStreaming.conf).connect();
            this.conn = connect;
            this.data = str;
            setUncaughtExceptionHandler((thread, th) -> {
                this.error = th;
                TestStreaming.LOG.error(connect.toTransactionString());
                TestStreaming.LOG.error("Thread " + thread.getName() + " died: " + th.getMessage(), th);
            });
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    try {
                        this.conn.beginTransaction();
                        this.conn.write(this.data.getBytes());
                        this.conn.write(this.data.getBytes());
                        this.conn.commitTransaction();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                if (this.conn != null) {
                    try {
                        this.conn.close();
                    } catch (Exception e2) {
                        TestStreaming.LOG.error("txnBatch.close() failed: " + e2.getMessage(), e2);
                    }
                }
            } catch (Throwable th) {
                if (this.conn != null) {
                    try {
                        this.conn.close();
                    } catch (Exception e3) {
                        TestStreaming.LOG.error("txnBatch.close() failed: " + e3.getMessage(), e3);
                    }
                }
                throw th;
            }
        }
    }

    public TestStreaming() throws Exception {
        partitionVals = new ArrayList(2);
        partitionVals.add("Asia");
        partitionVals.add("India");
        this.partitionVals2 = new ArrayList(1);
        this.partitionVals2.add("India");
        conf = new HiveConf(getClass());
        conf.set("fs.raw.impl", RawFileSystem.class.getName());
        conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
        TxnDbUtil.setConfValues(conf);
        conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
        conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
        this.dbFolder.create();
        TxnDbUtil.cleanDb(conf);
        TxnDbUtil.prepDb(conf);
        this.msClient = new HiveMetaStoreClient(conf);
    }

    @Before
    public void setup() throws Exception {
        SessionState.start(new CliSessionState(conf));
        this.driver = DriverFactory.newDriver(conf);
        this.driver.setMaxRows(200002);
        dropDB(this.msClient, dbName);
        String[] strArr = {COL1, COL2};
        String[] strArr2 = {"int", "string"};
        String[] strArr3 = {COL1};
        partLoc = createDbAndTable(this.driver, dbName, "alerts", partitionVals, strArr, strArr2, strArr3, new String[]{"Continent", "Country"}, this.dbFolder.newFolder("testing.db").toString(), 1);
        dropDB(this.msClient, dbName2);
        partLoc2 = createDbAndTable(this.driver, dbName2, "alerts", null, strArr, strArr2, strArr3, null, this.dbFolder.newFolder("testing2.db").toString(), 2);
        createStoreSales("testing5", this.dbFolder.newFolder("testing5.db").toString());
        runDDL(this.driver, "drop table testBucketing3.streamedtable");
        runDDL(this.driver, "drop table testBucketing3.finaltable");
        runDDL(this.driver, "drop table testBucketing3.nobucket");
    }

    @After
    public void cleanup() {
        this.msClient.close();
        this.driver.close();
    }

    private void createStoreSales(String str, String str2) throws Exception {
        String str3 = "raw://" + new Path(str2).toUri().toString();
        Assert.assertTrue(runDDL(this.driver, "create database IF NOT EXISTS " + str + " location '" + str3 + "'"));
        Assert.assertTrue(runDDL(this.driver, "use " + str));
        Assert.assertTrue(runDDL(this.driver, "drop table if exists store_sales"));
        Assert.assertTrue(runDDL(this.driver, "create table store_sales\n(\n    ss_sold_date_sk           int,\n    ss_sold_time_sk           int,\n    ss_item_sk                int,\n    ss_customer_sk            int,\n    ss_cdemo_sk               int,\n    ss_hdemo_sk               int,\n    ss_addr_sk                int,\n    ss_store_sk               int,\n    ss_promo_sk               int,\n    ss_ticket_number          int,\n    ss_quantity               int,\n    ss_wholesale_cost         decimal(7,2),\n    ss_list_price             decimal(7,2),\n    ss_sales_price            decimal(7,2),\n    ss_ext_discount_amt       decimal(7,2),\n    ss_ext_sales_price        decimal(7,2),\n    ss_ext_wholesale_cost     decimal(7,2),\n    ss_ext_list_price         decimal(7,2),\n    ss_ext_tax                decimal(7,2),\n    ss_coupon_amt             decimal(7,2),\n    ss_net_paid               decimal(7,2),\n    ss_net_paid_inc_tax       decimal(7,2),\n    ss_net_profit             decimal(7,2)\n)\n partitioned by (dt string)\nclustered by (ss_store_sk, ss_promo_sk)\nINTO 4 BUCKETS stored as orc  location '" + (str3 + "/store_sales") + "'  TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')"));
        Assert.assertTrue(runDDL(this.driver, "alter table store_sales add partition(dt='2015')"));
    }

    @Test
    public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add("2015");
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase("testing5").withTable("store_sales").withStaticPartitionValues(arrayList).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
        connect.beginTransaction();
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 10; i++) {
            for (int i2 = 0; i2 < 11; i2++) {
                sb.append(i2).append(',');
            }
            for (int i3 = 0; i3 < 12; i3++) {
                sb.append(i + 0.1d).append(',');
            }
            sb.setLength(sb.length() - 1);
            connect.write(sb.toString().getBytes());
        }
        connect.commitTransaction();
        connect.close();
        Iterator<String> it = queryTable(this.driver, "select row__id.bucketid, * from testing5.store_sales").iterator();
        while (it.hasNext()) {
            System.out.println(it.next());
        }
    }

    @Test
    public void testNoBuckets() throws Exception {
        queryTable(this.driver, "drop table if exists default.streamingnobuckets");
        queryTable(this.driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true')");
        queryTable(this.driver, "insert into default.streamingnobuckets values('foo','bar')");
        ArrayList<String> queryTable = queryTable(this.driver, "select * from default.streamingnobuckets");
        Assert.assertEquals(1L, queryTable.size());
        Assert.assertEquals("foo\tbar", queryTable.get(0));
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase("Default").withTable("streamingNoBuckets").withAgentInfo("UT_" + Thread.currentThread().getName()).withTransactionBatchSize(2).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
        connect.beginTransaction();
        connect.write("a1,b2".getBytes());
        connect.write("a3,b4".getBytes());
        ShowLocksResponse showLocks = TxnUtils.getTxnStore(conf).showLocks(new ShowLocksRequest());
        Assert.assertEquals(showLocks.getLocksSize(), 1L);
        Assert.assertEquals("streamingnobuckets", ((ShowLocksResponseElement) showLocks.getLocks().get(0)).getTablename());
        Assert.assertEquals("default", ((ShowLocksResponseElement) showLocks.getLocks().get(0)).getDbname());
        connect.commitTransaction();
        connect.beginTransaction();
        connect.write("a5,b6".getBytes());
        connect.write("a7,b8".getBytes());
        connect.commitTransaction();
        connect.close();
        Assert.assertEquals("", 0L, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
        ArrayList<String> queryTable2 = queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
        Assert.assertTrue(queryTable2.get(0), queryTable2.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
        Assert.assertTrue(queryTable2.get(0), queryTable2.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
        Assert.assertTrue(queryTable2.get(1), queryTable2.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
        Assert.assertTrue(queryTable2.get(1), queryTable2.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue(queryTable2.get(2), queryTable2.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue(queryTable2.get(2), queryTable2.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue(queryTable2.get(3), queryTable2.get(3).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
        Assert.assertTrue(queryTable2.get(3), queryTable2.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue(queryTable2.get(4), queryTable2.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
        Assert.assertTrue(queryTable2.get(4), queryTable2.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        queryTable(this.driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
        queryTable(this.driver, "delete from default.streamingnobuckets where a='a1'");
        ArrayList<String> queryTable3 = queryTable(this.driver, "select a, b from default.streamingnobuckets order by a, b");
        int i = 0 + 1;
        Assert.assertEquals("at row=0", "0\t0", queryTable3.get(0));
        String str = "at row=" + i;
        int i2 = i + 1;
        Assert.assertEquals(str, "a3\tb4", queryTable3.get(i));
        String str2 = "at row=" + i2;
        int i3 = i2 + 1;
        Assert.assertEquals(str2, "a5\tb6", queryTable3.get(i2));
        String str3 = "at row=" + i3;
        int i4 = i3 + 1;
        Assert.assertEquals(str3, "foo\tbar", queryTable3.get(i3));
        queryTable(this.driver, "alter table default.streamingnobuckets compact 'major'");
        runWorker(conf);
        ArrayList<String> queryTable4 = queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
        Assert.assertTrue(queryTable4.get(0), queryTable4.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
        Assert.assertTrue(queryTable4.get(0), queryTable4.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
        Assert.assertTrue(queryTable4.get(1), queryTable4.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue(queryTable4.get(1), queryTable4.get(1).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
        Assert.assertTrue(queryTable4.get(2), queryTable4.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
        Assert.assertTrue(queryTable4.get(2), queryTable4.get(2).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
        Assert.assertTrue(queryTable4.get(3), queryTable4.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
        Assert.assertTrue(queryTable4.get(3), queryTable4.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
    }

    @Test
    public void testAllTypesDelimitedWriter() throws Exception {
        queryTable(this.driver, "drop table if exists default.alltypes");
        queryTable(this.driver, "create table if not exists default.alltypes ( bo boolean, ti tinyint, si smallint, i int, bi bigint, f float, d double, de decimal(10,3), ts timestamp, da date, s string, c char(5), vc varchar(5), m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) stored as orc TBLPROPERTIES('transactional'='true')");
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase("default").withTable("alltypes").withAgentInfo("UT_" + Thread.currentThread().getName()).withTransactionBatchSize(2).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter('|').withCollectionDelimiter(',').withMapKeyDelimiter(':').build()).withHiveConf(conf).connect();
        connect.beginTransaction();
        connect.write("true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo".getBytes());
        connect.write("false|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 15:59:58.174|1971-01-01|abcd|world|world|k4:v4|200,300|20,bar".getBytes());
        connect.commitTransaction();
        connect.close();
        ArrayList<String> queryTable = queryTable(this.driver, "select ROW__ID, bo, ti, si, i, bi, f, d, de, ts, da, s, c, vc, m, l, st, INPUT__FILE__NAME from default.alltypes order by ROW__ID");
        Assert.assertEquals(2L, queryTable.size());
        String str = queryTable.get(0);
        String str2 = queryTable.get(1);
        Assert.assertTrue(str, str.startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 15:59:58.174\t1970-01-01\tstring\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}"));
        Assert.assertTrue(str, str.endsWith("alltypes/delta_0000001_0000002/bucket_00000"));
        Assert.assertTrue(str2, str2.startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 15:59:58.174\t1971-01-01\tabcd\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}"));
        Assert.assertTrue(str2, str2.endsWith("alltypes/delta_0000001_0000002/bucket_00000"));
    }

    @Test
    public void testAllTypesDelimitedWriterInputStream() throws Exception {
        queryTable(this.driver, "drop table if exists default.alltypes");
        queryTable(this.driver, "create table if not exists default.alltypes ( bo boolean, ti tinyint, si smallint, i int, bi bigint, f float, d double, de decimal(10,3), ts timestamp, da date, s string, c char(5), vc varchar(5), m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) stored as orc TBLPROPERTIES('transactional'='true')");
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase("default").withTable("alltypes").withAgentInfo("UT_" + Thread.currentThread().getName()).withTransactionBatchSize(2).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter('|').withCollectionDelimiter(',').withMapKeyDelimiter(':').withLineDelimiterPattern("\n").build()).withHiveConf(conf).connect();
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(("true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo\nfalse|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 15:59:58.174|1971-01-01|abcd|world|world|k4:v4|200,300|20,bar\n").getBytes());
        connect.beginTransaction();
        connect.write(byteArrayInputStream);
        connect.commitTransaction();
        connect.close();
        byteArrayInputStream.close();
        ArrayList<String> queryTable = queryTable(this.driver, "select ROW__ID, bo, ti, si, i, bi, f, d, de, ts, da, s, c, vc, m, l, st, INPUT__FILE__NAME from default.alltypes order by ROW__ID");
        Assert.assertEquals(2L, queryTable.size());
        String str = queryTable.get(0);
        String str2 = queryTable.get(1);
        Assert.assertTrue(str, str.startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 15:59:58.174\t1970-01-01\tstring\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}"));
        Assert.assertTrue(str, str.endsWith("alltypes/delta_0000001_0000002/bucket_00000"));
        Assert.assertTrue(str2, str2.startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 15:59:58.174\t1971-01-01\tabcd\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}"));
        Assert.assertTrue(str2, str2.endsWith("alltypes/delta_0000001_0000002/bucket_00000"));
    }

    @Test
    public void testAutoRollTransactionBatch() throws Exception {
        queryTable(this.driver, "drop table if exists default.streamingnobuckets");
        queryTable(this.driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true')");
        queryTable(this.driver, "insert into default.streamingnobuckets values('foo','bar')");
        ArrayList<String> queryTable = queryTable(this.driver, "select * from default.streamingnobuckets");
        Assert.assertEquals(1L, queryTable.size());
        Assert.assertEquals("foo\tbar", queryTable.get(0));
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase("default").withTable("streamingnobuckets").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).withTransactionBatchSize(2).connect();
        connect.beginTransaction();
        connect.write("a1,b2".getBytes());
        connect.write("a3,b4".getBytes());
        connect.commitTransaction();
        connect.beginTransaction();
        connect.write("a5,b6".getBytes());
        connect.write("a7,b8".getBytes());
        connect.commitTransaction();
        connect.beginTransaction();
        connect.write("a9,b10".getBytes());
        connect.write("a11,b12".getBytes());
        connect.commitTransaction();
        connect.beginTransaction();
        connect.write("a13,b14".getBytes());
        connect.write("a15,b16".getBytes());
        connect.commitTransaction();
        connect.close();
        Assert.assertEquals("", 0L, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
        ArrayList<String> queryTable2 = queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
        Assert.assertTrue(queryTable2.get(0), queryTable2.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
        Assert.assertTrue(queryTable2.get(0), queryTable2.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
        Assert.assertTrue(queryTable2.get(1), queryTable2.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
        Assert.assertTrue(queryTable2.get(1), queryTable2.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue(queryTable2.get(2), queryTable2.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue(queryTable2.get(2), queryTable2.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue(queryTable2.get(3), queryTable2.get(3).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
        Assert.assertTrue(queryTable2.get(3), queryTable2.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue(queryTable2.get(4), queryTable2.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
        Assert.assertTrue(queryTable2.get(4), queryTable2.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue(queryTable2.get(5), queryTable2.get(5).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\ta9\tb10"));
        Assert.assertTrue(queryTable2.get(5), queryTable2.get(5).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
        Assert.assertTrue(queryTable2.get(6), queryTable2.get(6).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12"));
        Assert.assertTrue(queryTable2.get(6), queryTable2.get(6).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
        Assert.assertTrue(queryTable2.get(7), queryTable2.get(7).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14"));
        Assert.assertTrue(queryTable2.get(7), queryTable2.get(7).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
        Assert.assertTrue(queryTable2.get(8), queryTable2.get(8).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\ta15\tb16"));
        Assert.assertTrue(queryTable2.get(8), queryTable2.get(8).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
        queryTable(this.driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
        queryTable(this.driver, "delete from default.streamingnobuckets where a='a1'");
        queryTable(this.driver, "update default.streamingnobuckets set a=0, b=0 where a='a15'");
        queryTable(this.driver, "delete from default.streamingnobuckets where a='a9'");
        ArrayList<String> queryTable3 = queryTable(this.driver, "select a, b from default.streamingnobuckets order by a, b");
        int i = 0 + 1;
        Assert.assertEquals("at row=0", "0\t0", queryTable3.get(0));
        String str = "at row=" + i;
        int i2 = i + 1;
        Assert.assertEquals(str, "0\t0", queryTable3.get(i));
        String str2 = "at row=" + i2;
        int i3 = i2 + 1;
        Assert.assertEquals(str2, "a11\tb12", queryTable3.get(i2));
        String str3 = "at row=" + i3;
        int i4 = i3 + 1;
        Assert.assertEquals(str3, "a13\tb14", queryTable3.get(i3));
        String str4 = "at row=" + i4;
        int i5 = i4 + 1;
        Assert.assertEquals(str4, "a3\tb4", queryTable3.get(i4));
        String str5 = "at row=" + i5;
        int i6 = i5 + 1;
        Assert.assertEquals(str5, "a5\tb6", queryTable3.get(i5));
        String str6 = "at row=" + i6;
        int i7 = i6 + 1;
        Assert.assertEquals(str6, "foo\tbar", queryTable3.get(i6));
        queryTable(this.driver, "alter table default.streamingnobuckets compact 'major'");
        runWorker(conf);
        ArrayList<String> queryTable4 = queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
        Assert.assertTrue(queryTable4.get(0), queryTable4.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
        Assert.assertTrue(queryTable4.get(0), queryTable4.get(0).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
        Assert.assertTrue(queryTable4.get(1), queryTable4.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue(queryTable4.get(1), queryTable4.get(1).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
        Assert.assertTrue(queryTable4.get(2), queryTable4.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
        Assert.assertTrue(queryTable4.get(2), queryTable4.get(2).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
        Assert.assertTrue(queryTable4.get(3), queryTable4.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12"));
        Assert.assertTrue(queryTable4.get(3), queryTable4.get(3).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
        Assert.assertTrue(queryTable4.get(4), queryTable4.get(4).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14"));
        Assert.assertTrue(queryTable4.get(4), queryTable4.get(4).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
        Assert.assertTrue(queryTable4.get(5), queryTable4.get(5).startsWith("{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
        Assert.assertTrue(queryTable4.get(5), queryTable4.get(5).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
    }

    public static void runWorker(HiveConf hiveConf) throws MetaException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Worker worker = new Worker();
        worker.setThreadId((int) worker.getId());
        worker.setConf(hiveConf);
        worker.init(atomicBoolean, new AtomicBoolean());
        worker.run();
    }

    @Test
    public void testStreamBucketingMatchesRegularBucketing() throws Exception {
        String str = "raw://" + new Path(this.dbFolder.newFolder().toString()).toUri().toString();
        String str2 = "'" + str + "/streamedtable'";
        String str3 = "'" + str + "/finaltable'";
        String str4 = "'" + str + "/nobucket'";
        conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
        try {
            IDriver newDriver = DriverFactory.newDriver(conf);
            Throwable th = null;
            try {
                try {
                    runDDL(newDriver, "create database testBucketing3");
                    runDDL(newDriver, "use testBucketing3");
                    runDDL(newDriver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into 100 buckets  stored as orc  location " + str2 + " TBLPROPERTIES ('transactional'='true')");
                    runDDL(newDriver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + str4);
                    runDDL(newDriver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into 100 buckets  stored as orc location " + str3 + " TBLPROPERTIES ('transactional'='true')");
                    HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase("testBucketing3").withTable("streamedtable").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
                    connect.beginTransaction();
                    for (String str5 : new String[]{"PSFAHYLZVC,29,EPNMA", "PPPRKWAYAU,96,VUTEE", "MIAOFERCHI,3,WBDSI", "CEGQAZOWVN,0,WCUZL", "XWAKMNSVQF,28,YJVHU", "XBWTSAJWME,2,KDQFO", "FUVLQTAXAY,5,LDSDG", "QTQMDJMGJH,6,QBOMA", "EFLOTLWJWN,71,GHWPS", "PEQNAOJHCM,82,CAAFI", "MOEKQLGZCP,41,RUACR", "QZXMCOPTID,37,LFLWE", "EYALVWICRD,13,JEZLC", "VYWLZAYTXX,16,DMVZX", "OSALYSQIXR,47,HNZVE", "JGKVHKCEGQ,25,KSCJB", "WQFMMYDHET,12,DTRWA", "AJOVAYZKZQ,15,YBKFO", "YAQONWCUAU,31,QJNHZ", "DJBXUEUOEB,35,IYCBL"}) {
                        connect.write(str5.getBytes());
                    }
                    connect.commitTransaction();
                    connect.close();
                    Iterator<String> it = queryTable(newDriver, "select row__id.bucketid, * from streamedtable order by key2").iterator();
                    while (it.hasNext()) {
                        LOG.error(it.next());
                    }
                    newDriver.run("insert into nobucket select row__id.bucketid,* from streamedtable");
                    runDDL(newDriver, "insert into finaltable select * from nobucket");
                    ArrayList<String> queryTable = queryTable(newDriver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid");
                    Iterator<String> it2 = queryTable.iterator();
                    while (it2.hasNext()) {
                        LOG.error(it2.next());
                    }
                    Assert.assertTrue(queryTable.isEmpty());
                    if (newDriver != null) {
                        if (0 != 0) {
                            try {
                                newDriver.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newDriver.close();
                        }
                    }
                    conf.unset(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname);
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            conf.unset(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname);
            throw th3;
        }
    }

    @Test
    public void testTableValidation() throws Exception {
        String str = "raw://" + new Path(this.dbFolder.newFolder().toString()).toUri().toString();
        String str2 = "'" + str + "/validation1'";
        runDDL(this.driver, "create database testBucketing3");
        runDDL(this.driver, "use testBucketing3");
        runDDL(this.driver, "create table validation1 ( key1 string, data string ) clustered by ( key1 ) into 100 buckets  stored as orc  location " + str2 + " TBLPROPERTIES ('transactional'='false')");
        runDDL(this.driver, "create table validation2 ( key1 string, data string ) clustered by ( key1 ) into 100 buckets  stored as orc  location " + ("'" + str + "/validation2'") + " TBLPROPERTIES ('transactional'='false')");
        StrictDelimitedInputWriter build = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection hiveStreamingConnection = null;
        try {
            hiveStreamingConnection = HiveStreamingConnection.newBuilder().withDatabase("testBucketing3").withTable("validation2").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(build).withHiveConf(conf).connect();
            Assert.assertTrue("InvalidTable exception was not thrown", false);
            if (hiveStreamingConnection != null) {
                hiveStreamingConnection.close();
            }
        } catch (InvalidTable e) {
            if (hiveStreamingConnection != null) {
                hiveStreamingConnection.close();
            }
        } catch (Throwable th) {
            if (hiveStreamingConnection != null) {
                hiveStreamingConnection.close();
            }
            throw th;
        }
        try {
            hiveStreamingConnection = HiveStreamingConnection.newBuilder().withDatabase("testBucketing3").withTable("validation2").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(build).withHiveConf(conf).connect();
            Assert.assertTrue("InvalidTable exception was not thrown", false);
            if (hiveStreamingConnection != null) {
                hiveStreamingConnection.close();
            }
        } catch (InvalidTable e2) {
            if (hiveStreamingConnection != null) {
                hiveStreamingConnection.close();
            }
        } catch (Throwable th2) {
            if (hiveStreamingConnection != null) {
                hiveStreamingConnection.close();
            }
            throw th2;
        }
    }

    @Deprecated
    private void checkDataWritten(Path path, long j, long j2, int i, int i2, String... strArr) throws Exception {
        ValidWriteIdList validWriteIds = this.msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, "alerts"));
        AcidUtils.Directory acidState = AcidUtils.getAcidState(path, conf, validWriteIds);
        Assert.assertEquals(0L, acidState.getObsolete().size());
        Assert.assertEquals(0L, acidState.getOriginalFiles().size());
        List<AcidUtils.ParsedDelta> currentDirectories = acidState.getCurrentDirectories();
        System.out.println("Files found: ");
        Iterator it = currentDirectories.iterator();
        while (it.hasNext()) {
            System.out.println(((AcidUtils.ParsedDelta) it.next()).getPath().toString());
        }
        Assert.assertEquals(i2, currentDirectories.size());
        long j3 = Long.MAX_VALUE;
        long j4 = Long.MIN_VALUE;
        for (AcidUtils.ParsedDelta parsedDelta : currentDirectories) {
            if (parsedDelta.getMaxWriteId() > j4) {
                j4 = parsedDelta.getMaxWriteId();
            }
            if (parsedDelta.getMinWriteId() < j3) {
                j3 = parsedDelta.getMinWriteId();
            }
        }
        Assert.assertEquals(j, j3);
        Assert.assertEquals(j2, j4);
        OrcInputFormat orcInputFormat = new OrcInputFormat();
        JobConf jobConf = new JobConf();
        jobConf.set("mapred.input.dir", path.toString());
        jobConf.set("bucket_count", Integer.toString(i));
        jobConf.set("schema.evolution.columns", "id,msg");
        jobConf.set("schema.evolution.columns.types", "bigint:string");
        AcidUtils.setAcidOperationalProperties(jobConf, true, (AcidUtils.AcidOperationalProperties) null);
        jobConf.setBoolean("transactional", true);
        jobConf.set("hive.txn.valid.writeids", validWriteIds.toString());
        InputSplit[] splits = orcInputFormat.getSplits(jobConf, i);
        Assert.assertEquals(i2, splits.length);
        RecordReader recordReader = orcInputFormat.getRecordReader(splits[0], jobConf, Reporter.NULL);
        NullWritable nullWritable = (NullWritable) recordReader.createKey();
        OrcStruct orcStruct = (OrcStruct) recordReader.createValue();
        for (String str : strArr) {
            Assert.assertEquals(true, Boolean.valueOf(recordReader.next(nullWritable, orcStruct)));
            Assert.assertEquals(str, orcStruct.toString());
        }
        Assert.assertEquals(false, Boolean.valueOf(recordReader.next(nullWritable, orcStruct)));
    }

    private void checkDataWritten2(Path path, long j, long j2, int i, String str, boolean z, String... strArr) throws Exception {
        AcidUtils.Directory acidState = AcidUtils.getAcidState(path, conf, this.msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, "alerts")));
        Assert.assertEquals(0L, acidState.getObsolete().size());
        Assert.assertEquals(0L, acidState.getOriginalFiles().size());
        List<AcidUtils.ParsedDelta> currentDirectories = acidState.getCurrentDirectories();
        System.out.println("Files found: ");
        Iterator it = currentDirectories.iterator();
        while (it.hasNext()) {
            System.out.println(((AcidUtils.ParsedDelta) it.next()).getPath().toString());
        }
        Assert.assertEquals(i, currentDirectories.size());
        long j3 = Long.MAX_VALUE;
        long j4 = Long.MIN_VALUE;
        for (AcidUtils.ParsedDelta parsedDelta : currentDirectories) {
            if (parsedDelta.getMaxWriteId() > j4) {
                j4 = parsedDelta.getMaxWriteId();
            }
            if (parsedDelta.getMinWriteId() < j3) {
                j3 = parsedDelta.getMinWriteId();
            }
        }
        Assert.assertEquals(j, j3);
        Assert.assertEquals(j2, j4);
        boolean boolVar = conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
        if (z) {
            conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
        }
        String var = conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
        Iterator it2 = HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator().getExpected().iterator();
        while (it2.hasNext()) {
            conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, ((String) it2.next()).toUpperCase());
            ArrayList<String> queryTable = queryTable(this.driver, str);
            for (int i2 = 0; i2 < queryTable.size(); i2++) {
                Assert.assertEquals("diff at [" + i2 + "].  actual=" + queryTable + " expected=" + Arrays.toString(strArr), strArr[i2], queryTable.get(i2));
            }
        }
        conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, var);
        conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, boolVar);
    }

    private void checkNothingWritten(Path path) throws Exception {
        AcidUtils.Directory acidState = AcidUtils.getAcidState(path, conf, this.msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, "alerts")));
        Assert.assertEquals(0L, acidState.getObsolete().size());
        Assert.assertEquals(0L, acidState.getOriginalFiles().size());
        Assert.assertEquals(0L, acidState.getCurrentDirectories().size());
    }

    @Test
    public void testEndpointConnection() throws Exception {
        StrictDelimitedInputWriter build = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(build).withHiveConf(conf).connect().close();
        HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(build).withHiveConf(conf).connect().close();
        try {
            HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(build).withHiveConf(conf).connect();
            Assert.assertTrue("ConnectionError was not thrown", false);
            connect.close();
        } catch (ConnectionError e) {
            Assert.assertTrue(e.toString().endsWith("specifies partitions for un-partitioned table"));
        }
    }

    @Test
    public void testAddPartition() throws Exception {
        ArrayList arrayList = new ArrayList(2);
        arrayList.add("Asia");
        arrayList.add("Nepal");
        Assert.assertNotNull(HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(arrayList).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect());
        Assert.assertNotNull("Did not find added partition", this.msClient.getPartition(dbName, "alerts", partitionVals));
    }

    @Test
    public void testTransactionBatchEmptyCommit() throws Exception {
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
        connect.beginTransaction();
        connect.commitTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, connect.getCurrentTransactionState());
        connect.close();
        HiveStreamingConnection connect2 = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
        connect2.beginTransaction();
        connect2.commitTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, connect2.getCurrentTransactionState());
        connect2.close();
    }

    @Test
    public void testTimeOutReaper() throws Exception {
        StrictDelimitedInputWriter build = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(build).withHiveConf(conf).connect();
        connect.beginTransaction();
        conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0L, TimeUnit.SECONDS);
        conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2L, TimeUnit.MILLISECONDS);
        AcidHouseKeeperService acidHouseKeeperService = new AcidHouseKeeperService();
        acidHouseKeeperService.setConf(conf);
        acidHouseKeeperService.run();
        try {
            connect.commitTransaction();
        } catch (TransactionError e) {
            Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException);
        }
        connect.close();
        HiveStreamingConnection connect2 = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(build).withHiveConf(conf).connect();
        connect2.beginTransaction();
        connect2.commitTransaction();
        connect2.beginTransaction();
        acidHouseKeeperService.run();
        try {
            connect2.commitTransaction();
        } catch (TransactionError e2) {
            Assert.assertTrue("Expected aborted transaction", e2.getCause() instanceof TxnAbortedException);
        }
        connect2.close();
    }

    @Test
    public void testHeartbeat() throws Exception {
        conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 200L, TimeUnit.MILLISECONDS);
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withTransactionBatchSize(20).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
        try {
            connect.beginTransaction();
            ShowLocksRequest showLocksRequest = new ShowLocksRequest();
            showLocksRequest.setDbname(dbName2);
            showLocksRequest.setTablename("alerts");
            ShowLocksResponse showLocks = this.msClient.showLocks(showLocksRequest);
            Assert.assertEquals("Wrong number of locks: " + showLocks, 1L, showLocks.getLocks().size());
            ShowLocksResponseElement showLocksResponseElement = (ShowLocksResponseElement) showLocks.getLocks().get(0);
            long acquiredat = showLocksResponseElement.getAcquiredat();
            long lastheartbeat = showLocksResponseElement.getLastheartbeat();
            ShowLocksResponse showLocks2 = this.msClient.showLocks(showLocksRequest);
            Assert.assertEquals("Wrong number of locks2: " + showLocks2, 1L, showLocks2.getLocks().size());
            ShowLocksResponseElement showLocksResponseElement2 = (ShowLocksResponseElement) showLocks2.getLocks().get(0);
            Assert.assertEquals("Acquired timestamp didn'table match", acquiredat, showLocksResponseElement2.getAcquiredat());
            Assert.assertTrue("Expected new heartbeat (" + showLocksResponseElement2.getLastheartbeat() + ") == old heartbeat(" + lastheartbeat + ")", showLocksResponseElement2.getLastheartbeat() == lastheartbeat);
            for (int i = 0; i < 20 * 3; i++) {
                connect.beginTransaction();
                if (i % 10 == 0) {
                    connect.abortTransaction();
                } else {
                    connect.commitTransaction();
                }
                Thread.sleep(10L);
            }
            conf.unset(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname);
            connect.close();
        } catch (Throwable th) {
            conf.unset(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname);
            connect.close();
            throw th;
        }
    }

    @Test
    public void testTransactionBatchEmptyAbort() throws Exception {
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
        connect.beginTransaction();
        connect.abortTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, connect.getCurrentTransactionState());
        connect.close();
        HiveStreamingConnection connect2 = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
        connect2.beginTransaction();
        connect2.abortTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, connect2.getCurrentTransactionState());
        connect2.close();
    }

    @Test
    public void testTransactionBatchCommitDelimited() throws Exception {
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withTransactionBatchSize(10).connect();
        connect.beginTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connect.getCurrentTransactionState());
        connect.write("1,Hello streaming".getBytes());
        connect.commitTransaction();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, connect.getCurrentTransactionState());
        connect.beginTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connect.getCurrentTransactionState());
        connect.write("2,Welcome to streaming".getBytes());
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        connect.commitTransaction();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        connect.close();
        Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, connect.getCurrentTransactionState());
        HiveStreamingConnection connect2 = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).connect();
        connect2.beginTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connect2.getCurrentTransactionState());
        connect2.write("1,Hello streaming".getBytes());
        connect2.commitTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, connect2.getCurrentTransactionState());
        connect2.close();
    }

    @Test
    public void testTransactionBatchCommitRegex() throws Exception {
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(StrictRegexWriter.newBuilder().withRegex("([^,]*),(.*)").build()).withTransactionBatchSize(10).connect();
        connect.beginTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connect.getCurrentTransactionState());
        connect.write("1,Hello streaming".getBytes());
        connect.commitTransaction();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, connect.getCurrentTransactionState());
        connect.beginTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connect.getCurrentTransactionState());
        connect.write("2,Welcome to streaming".getBytes());
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        connect.commitTransaction();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        connect.close();
        Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, connect.getCurrentTransactionState());
        HiveStreamingConnection connect2 = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(StrictRegexWriter.newBuilder().withRegex("([^:]*):(.*)").build()).connect();
        connect2.beginTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connect2.getCurrentTransactionState());
        connect2.write("1:Hello streaming".getBytes());
        connect2.commitTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, connect2.getCurrentTransactionState());
        connect2.close();
    }

    @Test
    public void testRegexInputStream() throws Exception {
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(StrictRegexWriter.newBuilder().withRegex("([^,]*),(.*)").build()).connect();
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("1,foo\r2,bar\r3,baz".getBytes());
        connect.beginTransaction();
        connect.write(byteArrayInputStream);
        connect.commitTransaction();
        byteArrayInputStream.close();
        connect.close();
        ArrayList<String> queryTable = queryTable(this.driver, "select * from testing.alerts");
        Assert.assertEquals(3L, queryTable.size());
        Assert.assertEquals("1\tfoo\tAsia\tIndia", queryTable.get(0));
        Assert.assertEquals("2\tbar\tAsia\tIndia", queryTable.get(1));
        Assert.assertEquals("3\tbaz\tAsia\tIndia", queryTable.get(2));
    }

    @Test
    public void testTransactionBatchCommitJson() throws Exception {
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictJsonWriter.newBuilder().build()).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connect.beginTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connect.getCurrentTransactionState());
        connect.write("{\"id\" : 1, \"msg\": \"Hello streaming\"}".getBytes());
        connect.commitTransaction();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, connect.getCurrentTransactionState());
        connect.close();
        Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, connect.getCurrentTransactionState());
        Assert.assertEquals(1L, queryTable(this.driver, "select * from testing.alerts").size());
    }

    @Test
    public void testJsonInputStream() throws Exception {
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictJsonWriter.newBuilder().withLineDelimiterPattern("\\|").build()).withHiveConf(conf).connect();
        connect.beginTransaction();
        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connect.getCurrentTransactionState());
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("{\"id\" : 1, \"msg\": \"Hello streaming\"}|{\"id\" : 2, \"msg\": \"Hello world\"}|{\"id\" : 3, \"msg\": \"Hello world!!\"}".getBytes());
        connect.write(byteArrayInputStream);
        connect.commitTransaction();
        byteArrayInputStream.close();
        connect.close();
        ArrayList<String> queryTable = queryTable(this.driver, "select * from testing.alerts");
        Assert.assertEquals(3L, queryTable.size());
        Assert.assertEquals("1\tHello streaming\tAsia\tIndia", queryTable.get(0));
        Assert.assertEquals("2\tHello world\tAsia\tIndia", queryTable.get(1));
        Assert.assertEquals("3\tHello world!!\tAsia\tIndia", queryTable.get(2));
    }

    @Test
    public void testRemainingTransactions() throws Exception {
        StrictDelimitedInputWriter build = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(build).withHiveConf(conf).connect();
        connect.beginTransaction();
        int i = 0;
        int remainingTransactions = connect.remainingTransactions();
        while (connect.remainingTransactions() > 0) {
            connect.beginTransaction();
            remainingTransactions--;
            Assert.assertEquals(remainingTransactions, connect.remainingTransactions());
            for (int i2 = 0; i2 < 2; i2++) {
                Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connect.getCurrentTransactionState());
                connect.write(((i * i2) + ",Hello streaming").getBytes());
            }
            connect.commitTransaction();
            Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, connect.getCurrentTransactionState());
            i++;
        }
        Assert.assertEquals(0L, connect.remainingTransactions());
        connect.close();
        Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, connect.getCurrentTransactionState());
        HiveStreamingConnection connect2 = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(build).withHiveConf(conf).connect();
        connect2.beginTransaction();
        int i3 = 0;
        int remainingTransactions2 = connect2.remainingTransactions();
        while (connect2.remainingTransactions() > 0) {
            connect2.beginTransaction();
            remainingTransactions2--;
            Assert.assertEquals(remainingTransactions2, connect2.remainingTransactions());
            for (int i4 = 0; i4 < 2; i4++) {
                Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connect2.getCurrentTransactionState());
                connect2.write(((i3 * i4) + ",Hello streaming").getBytes());
            }
            connect2.abortTransaction();
            Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, connect2.getCurrentTransactionState());
            i3++;
        }
        Assert.assertEquals(0L, connect2.remainingTransactions());
        connect2.close();
        Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, connect2.getCurrentTransactionState());
    }

    @Test
    public void testTransactionBatchAbort() throws Exception {
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
        connect.beginTransaction();
        connect.write("1,Hello streaming".getBytes());
        connect.write("2,Welcome to streaming".getBytes());
        connect.abortTransaction();
        checkNothingWritten(partLoc);
        Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, connect.getCurrentTransactionState());
        connect.close();
        checkNothingWritten(partLoc);
    }

    @Test
    public void testTransactionBatchAbortAndCommit() throws Exception {
        String str = "UT_" + Thread.currentThread().getName();
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo(str).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connect.beginTransaction();
        connect.write("1,Hello streaming".getBytes());
        connect.write("2,Welcome to streaming".getBytes());
        ShowLocksResponse showLocks = this.msClient.showLocks(new ShowLocksRequest());
        Assert.assertEquals("LockCount", 1L, showLocks.getLocksSize());
        Assert.assertEquals("LockType", LockType.SHARED_READ, ((ShowLocksResponseElement) showLocks.getLocks().get(0)).getType());
        Assert.assertEquals("LockState", LockState.ACQUIRED, ((ShowLocksResponseElement) showLocks.getLocks().get(0)).getState());
        Assert.assertEquals("AgentInfo", str, ((ShowLocksResponseElement) showLocks.getLocks().get(0)).getAgentInfo());
        connect.abortTransaction();
        checkNothingWritten(partLoc);
        Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, connect.getCurrentTransactionState());
        connect.beginTransaction();
        connect.write("1,Hello streaming".getBytes());
        connect.write("2,Welcome to streaming".getBytes());
        connect.commitTransaction();
        checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        connect.close();
    }

    @Test
    public void testMultipleTransactionBatchCommits() throws Exception {
        StrictDelimitedInputWriter build = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(build).withTransactionBatchSize(10).withHiveConf(conf).connect();
        connect.beginTransaction();
        connect.write("1,Hello streaming".getBytes());
        connect.commitTransaction();
        checkDataWritten2(partLoc, 1L, 10L, 1, "select id, msg from testing.alerts order by id, msg", false, "1\tHello streaming");
        connect.beginTransaction();
        connect.write("2,Welcome to streaming".getBytes());
        connect.commitTransaction();
        checkDataWritten2(partLoc, 1L, 10L, 1, "select id, msg from testing.alerts order by id, msg", true, "1\tHello streaming", "2\tWelcome to streaming");
        connect.close();
        HiveStreamingConnection connect2 = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(build).withTransactionBatchSize(10).withHiveConf(conf).connect();
        connect2.beginTransaction();
        connect2.write("3,Hello streaming - once again".getBytes());
        connect2.commitTransaction();
        checkDataWritten2(partLoc, 1L, 20L, 2, "select id, msg from testing.alerts order by id, msg", false, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again");
        connect2.beginTransaction();
        connect2.write("4,Welcome to streaming - once again".getBytes());
        connect2.commitTransaction();
        checkDataWritten2(partLoc, 1L, 20L, 2, "select id, msg from testing.alerts order by id, msg", true, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again");
        Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, connect2.getCurrentTransactionState());
        connect2.close();
    }

    @Test
    public void testInterleavedTransactionBatchCommits() throws Exception {
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connect.beginTransaction();
        HiveStreamingConnection connect2 = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connect2.beginTransaction();
        connect.write("1,Hello streaming".getBytes());
        connect2.write("3,Hello streaming - once again".getBytes());
        checkNothingWritten(partLoc);
        connect2.commitTransaction();
        checkDataWritten2(partLoc, 11L, 20L, 1, "select id, msg from testing.alerts order by id, msg", true, "3\tHello streaming - once again");
        connect.commitTransaction();
        FileSystem fileSystem = partLoc.getFileSystem(conf);
        Iterator it = AcidUtils.getAcidState(partLoc, conf, this.msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, "alerts"))).getCurrentDirectories().iterator();
        while (it.hasNext()) {
            for (FileStatus fileStatus : fileSystem.listStatus(((AcidUtils.ParsedDelta) it.next()).getPath(), AcidUtils.bucketFileFilter)) {
                Path sideFile = OrcAcidUtils.getSideFile(fileStatus.getPath());
                Assert.assertTrue(sideFile + " missing", fileSystem.exists(sideFile));
                long len = fileSystem.getFileStatus(sideFile).getLen();
                Assert.assertTrue("Expected " + sideFile + " to be non empty. lengh=" + len, len > 0);
                Assert.assertTrue("", AcidUtils.getLogicalLength(fileSystem, fileStatus) == fileStatus.getLen());
            }
        }
        checkDataWritten2(partLoc, 1L, 20L, 2, "select id, msg from testing.alerts order by id, msg", false, "1\tHello streaming", "3\tHello streaming - once again");
        connect.beginTransaction();
        connect.write("2,Welcome to streaming".getBytes());
        connect2.beginTransaction();
        connect2.write("4,Welcome to streaming - once again".getBytes());
        Iterator it2 = AcidUtils.getAcidState(partLoc, conf, this.msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, "alerts"))).getCurrentDirectories().iterator();
        while (it2.hasNext()) {
            for (FileStatus fileStatus2 : fileSystem.listStatus(((AcidUtils.ParsedDelta) it2.next()).getPath(), AcidUtils.bucketFileFilter)) {
                Path sideFile2 = OrcAcidUtils.getSideFile(fileStatus2.getPath());
                Assert.assertTrue(sideFile2 + " missing", fileSystem.exists(sideFile2));
                long len2 = fileSystem.getFileStatus(sideFile2).getLen();
                Assert.assertTrue("Expected " + sideFile2 + " to be non empty. lengh=" + len2, len2 > 0);
                Assert.assertTrue("", AcidUtils.getLogicalLength(fileSystem, fileStatus2) <= fileStatus2.getLen());
            }
        }
        checkDataWritten2(partLoc, 1L, 20L, 2, "select id, msg from testing.alerts order by id, msg", true, "1\tHello streaming", "3\tHello streaming - once again");
        connect.commitTransaction();
        checkDataWritten2(partLoc, 1L, 20L, 2, "select id, msg from testing.alerts order by id, msg", false, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again");
        connect2.commitTransaction();
        checkDataWritten2(partLoc, 1L, 20L, 2, "select id, msg from testing.alerts order by id, msg", true, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again");
        Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, connect.getCurrentTransactionState());
        Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, connect2.getCurrentTransactionState());
        connect.close();
        connect2.close();
    }

    @Test
    public void testConcurrentTransactionBatchCommits() throws Exception {
        ArrayList<WriterThd> arrayList = new ArrayList(3);
        arrayList.add(new WriterThd("1,Matrix"));
        arrayList.add(new WriterThd("2,Gandhi"));
        arrayList.add(new WriterThd("3,Silence"));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((WriterThd) it.next()).start();
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((WriterThd) it2.next()).join();
        }
        for (WriterThd writerThd : arrayList) {
            if (writerThd.error != null) {
                Assert.assertFalse("Writer thread" + writerThd.getName() + " died: " + writerThd.error.getMessage() + " See log file for stack trace", true);
            }
        }
    }

    private ArrayList<SampleRec> dumpBucket(Path path) throws IOException {
        Reader createReader = OrcFile.createReader(path, OrcFile.readerOptions(conf).filesystem(FileSystem.getLocal(new Configuration())));
        org.apache.hadoop.hive.ql.io.orc.RecordReader rows = createReader.rows();
        StructObjectInspector objectInspector = createReader.getObjectInspector();
        System.out.format("Found Bucket File : %s \n", path.getName());
        ArrayList<SampleRec> arrayList = new ArrayList<>();
        while (rows.hasNext()) {
            arrayList.add((SampleRec) deserializeDeltaFileRow(rows.next((Object) null), objectInspector)[5]);
        }
        return arrayList;
    }

    private static Object[] deserializeDeltaFileRow(Object obj, StructObjectInspector structObjectInspector) {
        List allStructFieldRefs = structObjectInspector.getAllStructFieldRefs();
        WritableIntObjectInspector fieldObjectInspector = ((StructField) allStructFieldRefs.get(0)).getFieldObjectInspector();
        WritableLongObjectInspector fieldObjectInspector2 = ((StructField) allStructFieldRefs.get(1)).getFieldObjectInspector();
        WritableIntObjectInspector fieldObjectInspector3 = ((StructField) allStructFieldRefs.get(2)).getFieldObjectInspector();
        WritableLongObjectInspector fieldObjectInspector4 = ((StructField) allStructFieldRefs.get(3)).getFieldObjectInspector();
        WritableLongObjectInspector fieldObjectInspector5 = ((StructField) allStructFieldRefs.get(4)).getFieldObjectInspector();
        StructObjectInspector fieldObjectInspector6 = ((StructField) allStructFieldRefs.get(5)).getFieldObjectInspector();
        return new Object[]{Integer.valueOf(fieldObjectInspector.get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(0)))), Long.valueOf(fieldObjectInspector2.get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(1)))), Integer.valueOf(fieldObjectInspector3.get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(2)))), Long.valueOf(fieldObjectInspector4.get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(3)))), Long.valueOf(fieldObjectInspector5.get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(4)))), deserializeInner(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(5)), fieldObjectInspector6)};
    }

    private static SampleRec deserializeInner(Object obj, StructObjectInspector structObjectInspector) {
        List allStructFieldRefs = structObjectInspector.getAllStructFieldRefs();
        return new SampleRec(((StructField) allStructFieldRefs.get(0)).getFieldObjectInspector().getPrimitiveJavaObject(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(0))), ((StructField) allStructFieldRefs.get(1)).getFieldObjectInspector().get(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(1))), ((StructField) allStructFieldRefs.get(2)).getFieldObjectInspector().getPrimitiveJavaObject(structObjectInspector.getStructFieldData(obj, (StructField) allStructFieldRefs.get(2))));
    }

    @Test
    public void testBucketing() throws Exception {
        String str = "UT_" + Thread.currentThread().getName();
        dropDB(this.msClient, dbName3);
        dropDB(this.msClient, dbName4);
        String replaceAll = (this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        createDbAndTable(this.driver, dbName3, tblName3, null, "key1,key2,data".split(","), "string,int,string".split(","), "key1,key2".split(","), null, replaceAll, 4);
        String replaceAll2 = (this.dbFolder.newFolder(dbName4).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        createDbAndTable(this.driver, dbName4, tblName4, null, "key3,key4,data2".split(","), "string,int,string".split(","), "key3,key4".split(","), null, replaceAll2, 4);
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo(str).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
        connect.beginTransaction();
        connect.write("name0,1,Hello streaming".getBytes());
        connect.write("name2,2,Welcome to streaming".getBytes());
        connect.write("name4,2,more Streaming unlimited".getBytes());
        connect.write("name5,2,even more Streaming unlimited".getBytes());
        connect.commitTransaction();
        connect.close();
        HiveStreamingConnection connect2 = HiveStreamingConnection.newBuilder().withDatabase(dbName4).withTable(tblName4).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
        connect2.beginTransaction();
        connect2.write("name5,2,fact3".getBytes());
        connect2.write("name8,2,fact3".getBytes());
        connect2.write("name0,1,fact1".getBytes());
        connect2.commitTransaction();
        connect2.close();
        HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets = dumpAllBuckets(replaceAll, tblName3);
        HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets2 = dumpAllBuckets(replaceAll2, tblName4);
        System.err.println("\n  Table 1");
        System.err.println(dumpAllBuckets);
        System.err.println("\n  Table 2");
        System.err.println(dumpAllBuckets2);
        Assert.assertEquals("number of buckets does not match expectation", dumpAllBuckets.values().size(), 3L);
        Assert.assertTrue("bucket 0 shouldn't have been created", dumpAllBuckets.get(0) == null);
        Assert.assertEquals("records in bucket does not match expectation", dumpAllBuckets.get(1).size(), 1L);
        Assert.assertEquals("records in bucket does not match expectation", dumpAllBuckets.get(2).size(), 2L);
        Assert.assertEquals("records in bucket does not match expectation", dumpAllBuckets.get(3).size(), 1L);
    }

    private void runCmdOnDriver(String str) {
        Assert.assertTrue(str + " failed", runDDL(this.driver, str));
    }

    @Test
    public void testFileDump() throws Exception {
        String str = "UT_" + Thread.currentThread().getName();
        dropDB(this.msClient, dbName3);
        dropDB(this.msClient, dbName4);
        String replaceAll = (this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        createDbAndTable(this.driver, dbName3, tblName3, null, "key1,key2,data".split(","), "string,int,string".split(","), "key1,key2".split(","), null, replaceAll, 4);
        createDbAndTable(this.driver, dbName4, tblName4, null, "key3,key4,data2".split(","), "string,int,string".split(","), "key3,key4".split(","), null, (this.dbFolder.newFolder(dbName4).getCanonicalPath() + ".db").replaceAll("\\\\", "/"), 4);
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo(str).withHiveConf(conf).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).connect();
        connect.beginTransaction();
        connect.write("name0,1,Hello streaming".getBytes());
        connect.write("name2,2,Welcome to streaming".getBytes());
        connect.write("name4,2,more Streaming unlimited".getBytes());
        connect.write("name5,2,even more Streaming unlimited".getBytes());
        connect.commitTransaction();
        connect.close();
        PrintStream printStream = System.err;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream));
        FileDump.main(new String[]{replaceAll});
        System.err.flush();
        System.setErr(printStream);
        String str2 = new String(byteArrayOutputStream.toByteArray());
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("is still open for writes.")));
        HiveStreamingConnection connect2 = HiveStreamingConnection.newBuilder().withDatabase(dbName4).withTable(tblName4).withAgentInfo(str).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).connect();
        connect2.beginTransaction();
        connect2.write("name5,2,fact3".getBytes());
        connect2.write("name8,2,fact3".getBytes());
        connect2.write("name0,1,fact1".getBytes());
        connect2.commitTransaction();
        connect2.close();
        PrintStream printStream2 = System.err;
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream2));
        FileDump.main(new String[]{replaceAll});
        System.out.flush();
        System.err.flush();
        System.setErr(printStream2);
        String str3 = new String(byteArrayOutputStream2.toByteArray());
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("Exception")));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("is still open for writes.")));
    }

    @Test
    public void testFileDumpDeltaFilesWithStreamingOptimizations() throws Exception {
        String str = "UT_" + Thread.currentThread().getName();
        dropDB(this.msClient, dbName3);
        dropDB(this.msClient, dbName4);
        String replaceAll = (this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        createDbAndTable(this.driver, dbName3, tblName3, null, "key1,key2,data".split(","), "string,int,string".split(","), "key1,key2".split(","), null, replaceAll, 4);
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo(str).withHiveConf(conf).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withStreamingOptimizations(true).connect();
        connect.beginTransaction();
        connect.write("name0,1,streaming".getBytes());
        connect.write("name2,2,streaming".getBytes());
        connect.write("name4,2,unlimited".getBytes());
        connect.write("name5,2,unlimited".getBytes());
        for (int i = 0; i < 6000; i++) {
            if (i % 2 == 0) {
                connect.write(("name" + i + "," + i + ",streaming").getBytes());
            } else {
                connect.write(("name" + i + "," + i + ",unlimited").getBytes());
            }
        }
        connect.commitTransaction();
        connect.close();
        connect.close();
        PrintStream printStream = System.out;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        System.setOut(new PrintStream(byteArrayOutputStream));
        FileDump.main(new String[]{replaceAll});
        System.out.flush();
        System.setOut(printStream);
        String str2 = new String(byteArrayOutputStream.toByteArray());
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Compression: NONE")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Column 0: count: 0 hasNull: false")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Column 1: count: 0 hasNull: false sum: 0")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Column 2: count: 0 hasNull: false sum: 0")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Column 3: count: 0 hasNull: false sum: 0")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Column 4: count: 0 hasNull: false sum: 0")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Column 5: count: 0 hasNull: false sum: 0")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Column 6: count: 0 hasNull: false")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Column 7: count: 0 hasNull: false")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Column 8: count: 0 hasNull: false sum: 0")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Column 9: count: 0 hasNull: false")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Encoding column 7: DIRECT_V2")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Encoding column 9: DIRECT_V2")));
    }

    @Test
    public void testFileDumpDeltaFilesWithoutStreamingOptimizations() throws Exception {
        String str = "UT_" + Thread.currentThread().getName();
        dropDB(this.msClient, dbName3);
        dropDB(this.msClient, dbName4);
        String replaceAll = (this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        createDbAndTable(this.driver, dbName3, tblName3, null, "key1,key2,data".split(","), "string,int,string".split(","), "key1,key2".split(","), null, replaceAll, 4);
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo(str).withHiveConf(conf).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withStreamingOptimizations(false).connect();
        connect.beginTransaction();
        connect.write("name0,1,streaming".getBytes());
        connect.write("name2,2,streaming".getBytes());
        connect.write("name4,2,unlimited".getBytes());
        connect.write("name5,2,unlimited".getBytes());
        for (int i = 0; i < 6000; i++) {
            if (i % 2 == 0) {
                connect.write(("name" + i + "," + i + ",streaming").getBytes());
            } else {
                connect.write(("name" + i + "," + i + ",unlimited").getBytes());
            }
        }
        connect.commitTransaction();
        connect.close();
        PrintStream printStream = System.out;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        System.setOut(new PrintStream(byteArrayOutputStream));
        FileDump.main(new String[]{replaceAll});
        System.out.flush();
        System.setOut(printStream);
        String str2 = new String(byteArrayOutputStream.toByteArray());
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Compression: ZLIB")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("Encoding column 9: DICTIONARY")));
    }

    @Test
    public void testFileDumpCorruptDataFiles() throws Exception {
        dropDB(this.msClient, dbName3);
        String replaceAll = (this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        createDbAndTable(this.driver, dbName3, tblName3, null, "key1,key2,data".split(","), "string,int,string".split(","), "key1,key2".split(","), null, replaceAll, 4);
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connect.beginTransaction();
        connect.write("name0,1,Hello streaming".getBytes());
        connect.write("name2,2,Welcome to streaming".getBytes());
        connect.write("name4,2,more Streaming unlimited".getBytes());
        connect.write("name5,2,even more Streaming unlimited".getBytes());
        connect.commitTransaction();
        Path path = new Path(replaceAll);
        for (String str : FileDump.getAllFilesInPath(path, conf)) {
            if (str.contains("bucket_00000")) {
                corruptDataFile(str, conf, Integer.MIN_VALUE);
            } else if (str.contains("bucket_00001")) {
                corruptDataFile(str, conf, -1);
            } else if (str.contains("bucket_00002")) {
                corruptDataFile(str, conf, 100);
            } else if (str.contains("bucket_00003")) {
                corruptDataFile(str, conf, 100);
            }
        }
        PrintStream printStream = System.err;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream));
        FileDump.main(new String[]{replaceAll});
        System.err.flush();
        System.setErr(printStream);
        String str2 = new String(byteArrayOutputStream.toByteArray());
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("Exception")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("3 file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("is still open for writes.")));
        PrintStream printStream2 = System.err;
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream2));
        FileDump.main(new String[]{replaceAll, "--recover", "--skip-dump"});
        System.err.flush();
        System.setErr(printStream2);
        String str3 = new String(byteArrayOutputStream2.toByteArray());
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00001 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("No readable footers found. Creating empty orc file.")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00002 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00003 recovered successfully!")));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("Exception")));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("is still open for writes.")));
        PrintStream printStream3 = System.err;
        ByteArrayOutputStream byteArrayOutputStream3 = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream3));
        FileDump.main(new String[]{replaceAll});
        System.err.flush();
        System.setErr(printStream3);
        String str4 = new String(byteArrayOutputStream3.toByteArray());
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("Exception")));
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("is still open for writes.")));
        Iterator it = FileDump.getAllFilesInPath(path, conf).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(false, Boolean.valueOf(((String) it.next()).contains("_flush_length")));
        }
        connect.close();
    }

    private void corruptDataFile(String str, Configuration configuration, int i) throws Exception {
        Path path = new Path(str);
        Path path2 = new Path(path.getParent(), path.getName() + ".corrupt");
        FileSystem fileSystem = path.getFileSystem(configuration);
        FileStatus fileStatus = fileSystem.getFileStatus(path);
        byte[] bArr = new byte[i == Integer.MIN_VALUE ? 0 : ((int) fileStatus.getLen()) + i];
        FSDataInputStream open = fileSystem.open(path);
        open.readFully(0L, bArr, 0, (int) Math.min(fileStatus.getLen(), bArr.length));
        open.close();
        FSDataOutputStream create = fileSystem.create(path2, true);
        create.write(bArr, 0, bArr.length);
        create.close();
        fileSystem.delete(path, false);
        fileSystem.rename(path2, path);
    }

    @Test
    public void testFileDumpCorruptSideFiles() throws Exception {
        dropDB(this.msClient, dbName3);
        String replaceAll = (this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db").replaceAll("\\\\", "/");
        createDbAndTable(this.driver, dbName3, tblName3, null, "key1,key2,data".split(","), "string,int,string".split(","), "key1,key2".split(","), null, replaceAll, 4);
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter(StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build()).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connect.beginTransaction();
        connect.write("name0,1,Hello streaming".getBytes());
        connect.write("name2,2,Welcome to streaming".getBytes());
        connect.write("name4,2,more Streaming unlimited".getBytes());
        connect.write("name5,2,even more Streaming unlimited".getBytes());
        connect.write("name6,3,aHello streaming".getBytes());
        connect.commitTransaction();
        HashMap hashMap = new HashMap();
        recordOffsets(conf, replaceAll, hashMap);
        connect.beginTransaction();
        connect.write("name01,11,-Hello streaming".getBytes());
        connect.write("name21,21,-Welcome to streaming".getBytes());
        connect.write("name41,21,-more Streaming unlimited".getBytes());
        connect.write("name51,21,-even more Streaming unlimited".getBytes());
        connect.write("name02,12,--Hello streaming".getBytes());
        connect.write("name22,22,--Welcome to streaming".getBytes());
        connect.write("name42,22,--more Streaming unlimited".getBytes());
        connect.write("name52,22,--even more Streaming unlimited".getBytes());
        connect.write("name7,4,aWelcome to streaming".getBytes());
        connect.write("name8,5,amore Streaming unlimited".getBytes());
        connect.write("name9,6,aeven more Streaming unlimited".getBytes());
        connect.write("name10,7,bHello streaming".getBytes());
        connect.write("name11,8,bWelcome to streaming".getBytes());
        connect.write("name12,9,bmore Streaming unlimited".getBytes());
        connect.write("name13,10,beven more Streaming unlimited".getBytes());
        connect.commitTransaction();
        recordOffsets(conf, replaceAll, hashMap);
        Path path = new Path(replaceAll);
        for (String str : FileDump.getAllFilesInPath(path, conf)) {
            if (str.contains("bucket_00000")) {
                corruptSideFile(str, conf, hashMap, "bucket_00000", -1);
            } else if (str.contains("bucket_00001")) {
                corruptSideFile(str, conf, hashMap, "bucket_00001", 0);
            } else if (str.contains("bucket_00002")) {
                corruptSideFile(str, conf, hashMap, "bucket_00002", 3);
            } else if (str.contains("bucket_00003")) {
                corruptSideFile(str, conf, hashMap, "bucket_00003", 10);
            }
        }
        PrintStream printStream = System.err;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream));
        FileDump.main(new String[]{replaceAll});
        System.err.flush();
        System.setErr(printStream);
        String str2 = new String(byteArrayOutputStream.toByteArray());
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("bucket_00000_flush_length [length: 11")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("bucket_00001_flush_length [length: 0")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("bucket_00002_flush_length [length: 24")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("bucket_00003_flush_length [length: 80")));
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("Exception")));
        Assert.assertEquals(true, Boolean.valueOf(str2.contains("4 file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str2.contains("is still open for writes.")));
        PrintStream printStream2 = System.err;
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream2));
        FileDump.main(new String[]{replaceAll, "--recover", "--skip-dump"});
        System.err.flush();
        System.setErr(printStream2);
        String str3 = new String(byteArrayOutputStream2.toByteArray());
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00000 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00001 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00002 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("bucket_00003 recovered successfully!")));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("Readable footerOffsets: " + hashMap.get("bucket_00000").toString())));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("Readable footerOffsets: " + hashMap.get("bucket_00001").toString())));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("Readable footerOffsets: " + hashMap.get("bucket_00002").toString())));
        Assert.assertEquals(true, Boolean.valueOf(str3.contains("Readable footerOffsets: " + hashMap.get("bucket_00003").toString())));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("Exception")));
        Assert.assertEquals(false, Boolean.valueOf(str3.contains("is still open for writes.")));
        PrintStream printStream3 = System.err;
        ByteArrayOutputStream byteArrayOutputStream3 = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream3));
        FileDump.main(new String[]{replaceAll});
        System.err.flush();
        System.setErr(printStream3);
        String str4 = new String(byteArrayOutputStream3.toByteArray());
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("Exception")));
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("file(s) are corrupted")));
        Assert.assertEquals(false, Boolean.valueOf(str4.contains("is still open for writes.")));
        Iterator it = FileDump.getAllFilesInPath(path, conf).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(false, Boolean.valueOf(((String) it.next()).contains("_flush_length")));
        }
        connect.close();
    }

    private void corruptSideFile(String str, HiveConf hiveConf, Map<String, List<Long>> map, String str2, int i) throws IOException {
        Path sideFile = OrcAcidUtils.getSideFile(new Path(str));
        Path path = new Path(sideFile.getParent(), sideFile.getName() + ".corrupt");
        FileSystem fileSystem = sideFile.getFileSystem(hiveConf);
        List<Long> list = map.get(str2);
        long longValue = list.get(list.size() - 1).longValue();
        FSDataOutputStream create = fileSystem.create(path, true);
        if (i < 0) {
            byte[] longToBytes = longToBytes(longValue);
            for (int i2 = 0; i2 < list.size() - 1; i2++) {
                create.writeLong(list.get(i2).longValue());
            }
            create.write(longToBytes, 0, 3);
        } else if (i > 0) {
            int min = Math.min(list.size(), i);
            for (int i3 = 0; i3 < min; i3++) {
                create.writeLong(list.get(i3).longValue());
            }
            int i4 = i - min;
            for (int i5 = 0; i5 < i4; i5++) {
                create.writeLong(longValue + ((i5 + 1) * 100));
            }
        }
        create.close();
        fileSystem.delete(sideFile, false);
        fileSystem.rename(path, sideFile);
    }

    private byte[] longToBytes(long j) {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putLong(j);
        return allocate.array();
    }

    private void recordOffsets(HiveConf hiveConf, String str, Map<String, List<Long>> map) throws IOException {
        for (String str2 : FileDump.getAllFilesInPath(new Path(str), hiveConf)) {
            Path path = new Path(str2);
            long len = path.getFileSystem(hiveConf).getFileStatus(path).getLen();
            if (str2.contains("bucket_00000")) {
                if (map.containsKey("bucket_00000")) {
                    List<Long> list = map.get("bucket_00000");
                    list.add(Long.valueOf(len));
                    map.put("bucket_00000", list);
                } else {
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(Long.valueOf(len));
                    map.put("bucket_00000", arrayList);
                }
            } else if (str2.contains("bucket_00001")) {
                if (map.containsKey("bucket_00001")) {
                    List<Long> list2 = map.get("bucket_00001");
                    list2.add(Long.valueOf(len));
                    map.put("bucket_00001", list2);
                } else {
                    ArrayList arrayList2 = new ArrayList();
                    arrayList2.add(Long.valueOf(len));
                    map.put("bucket_00001", arrayList2);
                }
            } else if (str2.contains("bucket_00002")) {
                if (map.containsKey("bucket_00002")) {
                    List<Long> list3 = map.get("bucket_00002");
                    list3.add(Long.valueOf(len));
                    map.put("bucket_00002", list3);
                } else {
                    ArrayList arrayList3 = new ArrayList();
                    arrayList3.add(Long.valueOf(len));
                    map.put("bucket_00002", arrayList3);
                }
            } else if (str2.contains("bucket_00003")) {
                if (map.containsKey("bucket_00003")) {
                    List<Long> list4 = map.get("bucket_00003");
                    list4.add(Long.valueOf(len));
                    map.put("bucket_00003", list4);
                } else {
                    ArrayList arrayList4 = new ArrayList();
                    arrayList4.add(Long.valueOf(len));
                    map.put("bucket_00003", arrayList4);
                }
            }
        }
    }

    @Test
    public void testErrorHandling() throws Exception {
        String str = "UT_" + Thread.currentThread().getName();
        runCmdOnDriver("create database testErrors");
        runCmdOnDriver("use testErrors");
        runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
        StrictDelimitedInputWriter build = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connect = HiveStreamingConnection.newBuilder().withDatabase("testErrors").withTable("T").withAgentInfo(str).withTransactionBatchSize(2).withRecordWriter(build).withHiveConf(conf).connect();
        connect.beginTransaction();
        FaultyWriter faultyWriter = new FaultyWriter(build);
        connect.close();
        Exception exc = null;
        GetOpenTxnsInfoResponse showTxns = this.msClient.showTxns();
        Assert.assertEquals("HWM didn'table match", 17L, showTxns.getTxn_high_water_mark());
        List open_txns = showTxns.getOpen_txns();
        Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ((TxnInfo) open_txns.get(0)).getState());
        Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ((TxnInfo) open_txns.get(1)).getState());
        try {
            connect.beginTransaction();
        } catch (StreamingException e) {
            exc = e;
        }
        Assert.assertTrue("beginTransaction() should have failed", exc != null && exc.getMessage().contains("Streaming connection is closed already."));
        HiveStreamingConnection connect2 = HiveStreamingConnection.newBuilder().withDatabase("testErrors").withTable("T").withAgentInfo(str).withTransactionBatchSize(2).withRecordWriter(build).withHiveConf(conf).connect();
        Exception exc2 = null;
        try {
            connect2.write("name0,1,Hello streaming".getBytes());
        } catch (StreamingException e2) {
            exc2 = e2;
        }
        Assert.assertTrue("write() should have failed", exc2 != null && exc2.getMessage().equals("Transaction batch is null. Missing beginTransaction?"));
        Exception exc3 = null;
        try {
            connect2.commitTransaction();
        } catch (StreamingException e3) {
            exc3 = e3;
        }
        Assert.assertTrue("commitTransaction() should have failed", exc3 != null && exc3.getMessage().equals("Transaction batch is null. Missing beginTransaction?"));
        HiveStreamingConnection connect3 = HiveStreamingConnection.newBuilder().withDatabase("testErrors").withTable("T").withAgentInfo(str).withTransactionBatchSize(2).withRecordWriter(faultyWriter).withHiveConf(conf).connect();
        connect3.beginTransaction();
        connect3.write("name2,2,Welcome to streaming".getBytes());
        connect3.write("name4,2,more Streaming unlimited".getBytes());
        connect3.write("name5,2,even more Streaming unlimited".getBytes());
        connect3.commitTransaction();
        String transactionString = connect3.toTransactionString();
        Assert.assertTrue("Actual: " + transactionString, transactionString.contains("LastUsed " + JavaUtils.txnIdToString(connect3.getCurrentTxnId().longValue())));
        Assert.assertTrue("Actual: " + transactionString, transactionString.contains("TxnStatus[CO]"));
        Exception exc4 = null;
        connect3.beginTransaction();
        faultyWriter.enableErrors();
        try {
            connect3.write("name6,2,Doh!".getBytes());
        } catch (StreamingIOFailure e4) {
            exc4 = e4;
        }
        Assert.assertTrue("Wrong exception: " + (exc4 != null ? exc4.getMessage() : "?"), exc4 != null && exc4.getMessage().contains("Simulated fault occurred"));
        Exception exc5 = null;
        try {
            connect3.commitTransaction();
        } catch (StreamingException e5) {
            exc5 = e5;
        }
        Assert.assertTrue("commitTransaction() should have failed", exc5 != null && exc5.getMessage().equals("Transaction state is not OPEN. Missing beginTransaction?"));
        String transactionString2 = connect3.toTransactionString();
        Assert.assertTrue("Actual: " + transactionString2, transactionString2.contains("LastUsed " + JavaUtils.txnIdToString(connect3.getCurrentTxnId().longValue())));
        Assert.assertTrue("Actual: " + transactionString2, transactionString2.contains("TxnStatus[CA]"));
        GetOpenTxnsInfoResponse showTxns2 = this.msClient.showTxns();
        Assert.assertEquals("HWM didn't match", 19L, showTxns2.getTxn_high_water_mark());
        List open_txns2 = showTxns2.getOpen_txns();
        Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ((TxnInfo) open_txns2.get(0)).getState());
        Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ((TxnInfo) open_txns2.get(1)).getState());
        Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ((TxnInfo) open_txns2.get(2)).getState());
        connect3.close();
        faultyWriter.disableErrors();
        HiveStreamingConnection connect4 = HiveStreamingConnection.newBuilder().withDatabase("testErrors").withTable("T").withAgentInfo(str).withTransactionBatchSize(2).withRecordWriter(faultyWriter).withHiveConf(conf).connect();
        connect4.beginTransaction();
        connect4.write("name2,2,Welcome to streaming".getBytes());
        faultyWriter.enableErrors();
        Exception exc6 = null;
        try {
            connect4.commitTransaction();
        } catch (StreamingIOFailure e6) {
            exc6 = e6;
        }
        Assert.assertTrue("Wrong exception: " + (exc6 != null ? exc6.getMessage() : "?"), exc6 != null && exc6.getMessage().contains("Simulated fault occurred"));
        GetOpenTxnsInfoResponse showTxns3 = this.msClient.showTxns();
        Assert.assertEquals("HWM didn'table match", 21L, showTxns3.getTxn_high_water_mark());
        List open_txns3 = showTxns3.getOpen_txns();
        Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ((TxnInfo) open_txns3.get(3)).getState());
        Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ((TxnInfo) open_txns3.get(4)).getState());
    }

    private HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets(String str, String str2) throws IOException {
        HashMap<Integer, ArrayList<SampleRec>> hashMap = new HashMap<>();
        for (File file : new File(str + "/" + str2).listFiles()) {
            if (file.getName().startsWith("delta")) {
                for (File file2 : file.listFiles(new FileFilter() { // from class: org.apache.hive.streaming.TestStreaming.1
                    @Override // java.io.FileFilter
                    public boolean accept(File file3) {
                        String name = file3.getName();
                        return (name.startsWith("_") || name.startsWith(".")) ? false : true;
                    }
                })) {
                    if (!file2.toString().endsWith("length")) {
                        hashMap.put(getBucketNumber(file2), dumpBucket(new Path(file2.toString())));
                    }
                }
            }
        }
        return hashMap;
    }

    private Integer getBucketNumber(File file) {
        String name = file.getName();
        return Integer.valueOf(Integer.parseInt(name.substring(name.indexOf(95) + 1, name.length())));
    }

    public static void dropDB(IMetaStoreClient iMetaStoreClient, String str) {
        try {
            Iterator it = iMetaStoreClient.listTableNamesByFilter(str, "", (short) -1).iterator();
            while (it.hasNext()) {
                iMetaStoreClient.dropTable(str, (String) it.next(), true, true);
            }
            iMetaStoreClient.dropDatabase(str);
        } catch (TException e) {
        }
    }

    private static Path createDbAndTable(IDriver iDriver, String str, String str2, List<String> list, String[] strArr, String[] strArr2, String[] strArr3, String[] strArr4, String str3, int i) throws Exception {
        String str4 = "raw://" + new Path(str3).toUri().toString();
        String str5 = str4 + "/" + str2;
        runDDL(iDriver, "create database IF NOT EXISTS " + str + " location '" + str4 + "'");
        runDDL(iDriver, "use " + str);
        runDDL(iDriver, "create table " + str2 + " ( " + getTableColumnsStr(strArr, strArr2) + " )" + getPartitionStmtStr(strArr4) + " clustered by ( " + join(strArr3, ",") + " ) into " + i + " buckets  stored as orc  location '" + str5 + "' TBLPROPERTIES ('transactional'='true') ");
        return (strArr4 == null || strArr4.length == 0) ? new Path(str5) : addPartition(iDriver, str2, list, strArr4);
    }

    private static Path addPartition(IDriver iDriver, String str, List<String> list, String[] strArr) throws Exception {
        String partsSpec = getPartsSpec(strArr, list);
        runDDL(iDriver, "alter table " + str + " add partition ( " + partsSpec + " )");
        return getPartitionPath(iDriver, str, partsSpec);
    }

    private static Path getPartitionPath(IDriver iDriver, String str, String str2) throws Exception {
        ArrayList<String> queryTable = queryTable(iDriver, "describe extended " + str + " PARTITION (" + str2 + ")");
        String str3 = queryTable.get(queryTable.size() - 1);
        int indexOf = str3.indexOf("location:") + "location:".length();
        return new Path(str3.substring(indexOf, str3.indexOf(",", indexOf)));
    }

    private static String getTableColumnsStr(String[] strArr, String[] strArr2) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < strArr.length; i++) {
            sb.append(strArr[i]).append(" ").append(strArr2[i]);
            if (i < strArr.length - 1) {
                sb.append(",");
            }
        }
        return sb.toString();
    }

    private static String getTablePartsStr(String[] strArr) {
        if (strArr == null || strArr.length == 0) {
            return "";
        }
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < strArr.length; i++) {
            sb.append(strArr[i]).append(" string");
            if (i < strArr.length - 1) {
                sb.append(",");
            }
        }
        return sb.toString();
    }

    private static String getPartsSpec(String[] strArr, List<String> list) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < list.size(); i++) {
            sb.append(strArr[i]).append(" = '").append(list.get(i)).append("'");
            if (i < list.size() - 1) {
                sb.append(",");
            }
        }
        return sb.toString();
    }

    private static String join(String[] strArr, String str) {
        if (strArr == null) {
            return null;
        }
        StringBuilder sb = new StringBuilder();
        boolean z = true;
        for (String str2 : strArr) {
            if (z) {
                z = false;
            } else {
                sb.append(str);
            }
            sb.append(str2.toString());
        }
        return sb.toString();
    }

    private static String getPartitionStmtStr(String[] strArr) {
        return (strArr == null || strArr.length == 0) ? "" : " partitioned by (" + getTablePartsStr(strArr) + " )";
    }

    private static boolean runDDL(IDriver iDriver, String str) {
        LOG.debug(str);
        System.out.println(str);
        CommandProcessorResponse run = iDriver.run(str);
        if (run.getResponseCode() == 0) {
            return true;
        }
        LOG.error("Statement: " + str + " failed: " + run);
        return false;
    }

    private static ArrayList<String> queryTable(IDriver iDriver, String str) throws IOException {
        CommandProcessorResponse run = iDriver.run(str);
        if (run.getResponseCode() != 0) {
            throw new RuntimeException(str + " failed: " + run);
        }
        ArrayList<String> arrayList = new ArrayList<>();
        iDriver.getResults(arrayList);
        return arrayList;
    }
}
