/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.streaming;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.Validator;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hive.streaming.ConnectionError;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.InvalidTable;
import org.apache.hive.streaming.RecordWriter;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StreamingException;
import org.apache.hive.streaming.StreamingIOFailure;
import org.apache.hive.streaming.StrictDelimitedInputWriter;
import org.apache.hive.streaming.StrictJsonWriter;
import org.apache.hive.streaming.StrictRegexWriter;
import org.apache.hive.streaming.TransactionError;
import org.apache.orc.impl.OrcAcidUtils;
import org.apache.orc.tools.FileDump;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Ignore(value="HIVE-24219")
public class TestStreaming {
    private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
    private static final String COL1 = "id";
    private static final String COL2 = "msg";
    private static HiveConf conf = null;
    private IDriver driver;
    private final IMetaStoreClient msClient;
    private static final String dbName = "testing";
    private static final String tblName = "alerts";
    private static final String[] fieldNames = new String[]{"id", "msg"};
    static List<String> partitionVals;
    private static Path partLoc;
    private static Path partLoc2;
    private static final String dbName2 = "testing2";
    private static final String tblName2 = "alerts";
    private static final String[] fieldNames2;
    private static final String dbName3 = "testing3";
    private static final String tblName3 = "dimensionTable";
    private static final String dbName4 = "testing4";
    private static final String tblName4 = "factTable";
    List<String> partitionVals2;
    private final String PART1_CONTINENT = "Asia";
    private final String PART1_COUNTRY = "India";
    @Rule
    public TemporaryFolder dbFolder = new TemporaryFolder();

    public TestStreaming() throws Exception {
        partitionVals = new ArrayList<String>(2);
        partitionVals.add("Asia");
        partitionVals.add("India");
        this.partitionVals2 = new ArrayList<String>(1);
        this.partitionVals2.add("India");
        conf = new HiveConf(this.getClass());
        conf.set("fs.raw.impl", RawFileSystem.class.getName());
        conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
        TestTxnDbUtil.setConfValues((Configuration)conf);
        conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
        conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
        this.dbFolder.create();
        MetastoreConf.setVar((Configuration)conf, (MetastoreConf.ConfVars)MetastoreConf.ConfVars.WAREHOUSE, (String)("raw://" + this.dbFolder.newFolder("warehouse")));
        TestTxnDbUtil.cleanDb((Configuration)conf);
        TestTxnDbUtil.prepDb((Configuration)conf);
        this.msClient = new HiveMetaStoreClient((Configuration)conf);
    }

    @Before
    public void setup() throws Exception {
        SessionState.start((SessionState)new CliSessionState(conf));
        this.driver = DriverFactory.newDriver((HiveConf)conf);
        this.driver.setMaxRows(200002);
        TestStreaming.dropDB(this.msClient, dbName);
        String[] colNames = new String[]{COL1, COL2};
        String[] colTypes = new String[]{"int", "string"};
        String[] bucketCols = new String[]{COL1};
        String loc1 = this.dbFolder.newFolder("testing.db").toString();
        String[] partNames = new String[]{"Continent", "Country"};
        partLoc = TestStreaming.createDbAndTable(this.driver, dbName, "alerts", partitionVals, colNames, colTypes, bucketCols, partNames, loc1, 1);
        TestStreaming.dropDB(this.msClient, dbName2);
        String loc2 = this.dbFolder.newFolder("testing2.db").toString();
        partLoc2 = TestStreaming.createDbAndTable(this.driver, dbName2, "alerts", null, colNames, colTypes, bucketCols, null, loc2, 2);
        String loc3 = this.dbFolder.newFolder("testing5.db").toString();
        this.createStoreSales("testing5", loc3);
        TestStreaming.runDDL(this.driver, "drop table testBucketing3.streamedtable");
        TestStreaming.runDDL(this.driver, "drop table testBucketing3.finaltable");
        TestStreaming.runDDL(this.driver, "drop table testBucketing3.nobucket");
    }

    @After
    public void cleanup() {
        this.msClient.close();
        this.driver.close();
    }

    private void createStoreSales(String dbName, String loc) throws Exception {
        String dbUri = "raw://" + new Path(loc).toUri().toString();
        String tableLoc = dbUri + "/store_sales";
        boolean success = TestStreaming.runDDL(this.driver, "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'");
        Assert.assertTrue((boolean)success);
        success = TestStreaming.runDDL(this.driver, "use " + dbName);
        Assert.assertTrue((boolean)success);
        success = TestStreaming.runDDL(this.driver, "drop table if exists store_sales");
        Assert.assertTrue((boolean)success);
        success = TestStreaming.runDDL(this.driver, "create table store_sales\n(\n    ss_sold_date_sk           int,\n    ss_sold_time_sk           int,\n    ss_item_sk                int,\n    ss_customer_sk            int,\n    ss_cdemo_sk               int,\n    ss_hdemo_sk               int,\n    ss_addr_sk                int,\n    ss_store_sk               int,\n    ss_promo_sk               int,\n    ss_ticket_number          int,\n    ss_quantity               int,\n    ss_wholesale_cost         decimal(7,2),\n    ss_list_price             decimal(7,2),\n    ss_sales_price            decimal(7,2),\n    ss_ext_discount_amt       decimal(7,2),\n    ss_ext_sales_price        decimal(7,2),\n    ss_ext_wholesale_cost     decimal(7,2),\n    ss_ext_list_price         decimal(7,2),\n    ss_ext_tax                decimal(7,2),\n    ss_coupon_amt             decimal(7,2),\n    ss_net_paid               decimal(7,2),\n    ss_net_paid_inc_tax       decimal(7,2),\n    ss_net_profit             decimal(7,2)\n)\n partitioned by (dt string)\nclustered by (ss_store_sk, ss_promo_sk)\nINTO 4 BUCKETS stored as orc  location '" + tableLoc + "'  TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
        Assert.assertTrue((boolean)success);
        success = TestStreaming.runDDL(this.driver, "alter table store_sales add partition(dt='2015')");
        Assert.assertTrue((boolean)success);
    }

    @Test
    public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
        ArrayList<String> partitionVals = new ArrayList<String>();
        partitionVals.add("2015");
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase("testing5").withTable("store_sales").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        StringBuilder row = new StringBuilder();
        for (int i = 0; i < 10; ++i) {
            for (int ints = 0; ints < 11; ++ints) {
                row.append(ints).append(',');
            }
            for (int decs = 0; decs < 12; ++decs) {
                row.append((double)i + 0.1).append(',');
            }
            row.setLength(row.length() - 1);
            connection.write(row.toString().getBytes());
        }
        connection.commitTransaction();
        connection.close();
        ArrayList<String> res = TestStreaming.queryTable(this.driver, "select row__id.bucketid, * from testing5.store_sales");
        for (String re : res) {
            System.out.println(re);
        }
    }

    @Test
    public void testNoBuckets() throws Exception {
        TestStreaming.queryTable(this.driver, "drop table if exists default.streamingnobuckets");
        TestStreaming.queryTable(this.driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true')");
        TestStreaming.queryTable(this.driver, "insert into default.streamingnobuckets values('foo','bar')");
        ArrayList<String> rs = TestStreaming.queryTable(this.driver, "select * from default.streamingnobuckets");
        Assert.assertEquals((long)1L, (long)rs.size());
        Assert.assertEquals((Object)"foo\tbar", rs.get(0));
        StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase("Default").withTable("streamingNoBuckets").withAgentInfo("UT_" + Thread.currentThread().getName()).withTransactionBatchSize(2).withRecordWriter((RecordWriter)wr).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.write("a1,b2".getBytes());
        connection.write("a3,b4".getBytes());
        TxnStore txnHandler = TxnUtils.getTxnStore((Configuration)conf);
        ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest());
        Assert.assertEquals((long)resp.getLocksSize(), (long)1L);
        Assert.assertEquals((Object)"streamingnobuckets", (Object)((ShowLocksResponseElement)resp.getLocks().get(0)).getTablename());
        Assert.assertEquals((Object)"default", (Object)((ShowLocksResponseElement)resp.getLocks().get(0)).getDbname());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("a5,b6".getBytes());
        connection.write("a7,b8".getBytes());
        connection.commitTransaction();
        connection.close();
        Assert.assertEquals((String)"", (long)0L, (long)BucketCodec.determineVersion((int)0x20000000).decodeWriterId(0x20000000));
        rs = TestStreaming.queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000_0"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(4)), (boolean)((String)rs.get(4)).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
        Assert.assertTrue((String)((String)rs.get(4)), (boolean)((String)rs.get(4)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        TestStreaming.queryTable(this.driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
        TestStreaming.queryTable(this.driver, "delete from default.streamingnobuckets where a='a1'");
        rs = TestStreaming.queryTable(this.driver, "select a, b from default.streamingnobuckets order by a, b");
        int row = 0;
        Assert.assertEquals((String)("at row=" + row), (Object)"0\t0", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"a3\tb4", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"a5\tb6", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"foo\tbar", rs.get(row++));
        TestStreaming.queryTable(this.driver, "alter table default.streamingnobuckets compact 'major'");
        TestStreaming.runWorker(conf);
        rs = TestStreaming.queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).startsWith("{\"writeid\":4,\"bucketid\":536870913,\"rowid\":0}\t0\t0"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
    }

    @Test
    public void testGetDeltaPath() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        Path path = connection.getDeltaFileLocation(partitionVals, Integer.valueOf(0), Long.valueOf(5L), Long.valueOf(5L), Integer.valueOf(9));
        Assert.assertTrue((boolean)path.toString().endsWith("testing.db/alerts/continent=Asia/country=India/delta_0000005_0000005_0009/bucket_00000"));
    }

    @Test
    public void testCommitWithKeyValue() throws Exception {
        TestStreaming.queryTable(this.driver, "drop table if exists default.keyvalue");
        TestStreaming.queryTable(this.driver, "create table default.keyvalue (a string, b string) stored as orc TBLPROPERTIES('transactional'='true')");
        TestStreaming.queryTable(this.driver, "insert into default.keyvalue values('foo','bar')");
        TestStreaming.queryTable(this.driver, "ALTER TABLE default.keyvalue SET TBLPROPERTIES('_metamykey' = 'myvalue')");
        ArrayList<String> rs = TestStreaming.queryTable(this.driver, "select * from default.keyvalue");
        Assert.assertEquals((long)1L, (long)rs.size());
        Assert.assertEquals((Object)"foo\tbar", rs.get(0));
        StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase("Default").withTable("keyvalue").withAgentInfo("UT_" + Thread.currentThread().getName()).withTransactionBatchSize(2).withRecordWriter((RecordWriter)wr).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.write("a1,b2".getBytes());
        connection.write("a3,b4".getBytes());
        connection.commitTransaction(null, "_metamykey", "myvalue");
        connection.close();
        rs = TestStreaming.queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.keyvalue order by ROW__ID");
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).endsWith("keyvalue/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).endsWith("keyvalue/delta_0000002_0000003/bucket_00000"));
        rs = TestStreaming.queryTable(this.driver, "SHOW TBLPROPERTIES default.keyvalue('_metamykey')");
        Assert.assertEquals((String)((String)rs.get(0)), (Object)"_metamykey\tmyvalue", rs.get(0));
    }

    @Test
    public void testConnectionWithWriteId() throws Exception {
        TestStreaming.queryTable(this.driver, "drop table if exists default.writeidconnection");
        TestStreaming.queryTable(this.driver, "create table default.writeidconnection (a string, b string) stored as orc TBLPROPERTIES('transactional'='true')");
        TestStreaming.queryTable(this.driver, "insert into default.writeidconnection values('a0','bar')");
        ArrayList<String> rs = TestStreaming.queryTable(this.driver, "select * from default.writeidconnection");
        Assert.assertEquals((long)1L, (long)rs.size());
        Assert.assertEquals((Object)"a0\tbar", rs.get(0));
        StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder().withDatabase("Default").withTable("writeidconnection").withRecordWriter((RecordWriter)writerT).withHiveConf(conf).connect();
        transactionConnection.beginTransaction();
        Table tObject = transactionConnection.getTable();
        Long writeId = transactionConnection.getCurrentWriteId();
        Assert.assertNotNull((Object)tObject);
        Assert.assertNotNull((Object)writeId);
        StrictDelimitedInputWriter writerOne = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connectionOne = HiveStreamingConnection.newBuilder().withDatabase("Default").withTable("writeidconnection").withRecordWriter((RecordWriter)writerOne).withHiveConf(conf).withWriteId(writeId.longValue()).withStatementId(1).withTableObject(tObject).connect();
        StrictDelimitedInputWriter writerTwo = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connectionTwo = HiveStreamingConnection.newBuilder().withDatabase("Default").withRecordWriter((RecordWriter)writerTwo).withHiveConf(conf).withWriteId(writeId.longValue()).withStatementId(2).withTableObject(tObject).connect();
        Assert.assertNotNull((Object)connectionOne);
        Assert.assertNotNull((Object)connectionTwo);
        connectionOne.beginTransaction();
        connectionTwo.beginTransaction();
        connectionOne.write("a1,b2".getBytes());
        connectionTwo.write("a5,b6".getBytes());
        connectionOne.write("a3,b4".getBytes());
        connectionOne.commitTransaction();
        connectionTwo.commitTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT, (Object)connectionOne.getCurrentTransactionState());
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT, (Object)connectionTwo.getCurrentTransactionState());
        try {
            connectionOne.beginTransaction();
            Assert.fail((String)"second beginTransaction should have thrown a StreamingException");
        }
        catch (StreamingException streamingException) {
            // empty catch block
        }
        connectionOne.close();
        connectionTwo.close();
        rs = TestStreaming.queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.writeidconnection order by ROW__ID");
        Assert.assertEquals((long)1L, (long)rs.size());
        transactionConnection.commitTransaction();
        rs = TestStreaming.queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.writeidconnection order by a");
        Assert.assertEquals((long)4L, (long)rs.size());
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ta0\tbar"));
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).endsWith("bucket_00000_0"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).contains("\"rowid\":0}\ta1\tb2"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).endsWith("bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).contains("\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).endsWith("bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).contains("\ta5\tb6"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).endsWith("bucket_00000"));
    }

    @Test
    public void testAllTypesDelimitedWriter() throws Exception {
        TestStreaming.queryTable(this.driver, "drop table if exists default.alltypes");
        TestStreaming.queryTable(this.driver, "create table if not exists default.alltypes ( bo boolean, ti tinyint, si smallint, i int, bi bigint, f float, d double, de decimal(10,3), ts timestamp, da date, s string, c char(5), vc varchar(5), m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) stored as orc TBLPROPERTIES('transactional'='true')");
        StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter('|').withCollectionDelimiter(',').withMapKeyDelimiter(':').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase("default").withTable("alltypes").withAgentInfo("UT_" + Thread.currentThread().getName()).withTransactionBatchSize(2).withRecordWriter((RecordWriter)wr).withHiveConf(conf).connect();
        String row1 = "true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo";
        String row2 = "false|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 15:59:58.174|1971-01-01|abcd|world|world|k4:v4|200,300|20,bar";
        connection.beginTransaction();
        connection.write(row1.getBytes());
        connection.write(row2.getBytes());
        connection.commitTransaction();
        connection.close();
        ArrayList<String> rs = TestStreaming.queryTable(this.driver, "select ROW__ID, bo, ti, si, i, bi, f, d, de, ts, da, s, c, vc, m, l, st, INPUT__FILE__NAME from default.alltypes order by ROW__ID");
        Assert.assertEquals((long)2L, (long)rs.size());
        String gotRow1 = (String)rs.get(0);
        String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 15:59:58.174\t1970-01-01\tstring\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}";
        String expectedSuffixRow1 = "alltypes/delta_0000001_0000002/bucket_00000";
        String gotRow2 = (String)rs.get(1);
        String expectedPrefixRow2 = "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 15:59:58.174\t1971-01-01\tabcd\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}";
        String expectedSuffixRow2 = "alltypes/delta_0000001_0000002/bucket_00000";
        Assert.assertTrue((String)gotRow1, (boolean)gotRow1.startsWith(expectedPrefixRow1));
        Assert.assertTrue((String)gotRow1, (boolean)gotRow1.endsWith(expectedSuffixRow1));
        Assert.assertTrue((String)gotRow2, (boolean)gotRow2.startsWith(expectedPrefixRow2));
        Assert.assertTrue((String)gotRow2, (boolean)gotRow2.endsWith(expectedSuffixRow2));
    }

    @Test
    public void testAllTypesDelimitedWriterInputStream() throws Exception {
        TestStreaming.queryTable(this.driver, "drop table if exists default.alltypes");
        TestStreaming.queryTable(this.driver, "create table if not exists default.alltypes ( bo boolean, ti tinyint, si smallint, i int, bi bigint, f float, d double, de decimal(10,3), ts timestamp, da date, s string, c char(5), vc varchar(5), m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) stored as orc TBLPROPERTIES('transactional'='true')");
        StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter('|').withCollectionDelimiter(',').withMapKeyDelimiter(':').withLineDelimiterPattern("\n").build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase("default").withTable("alltypes").withAgentInfo("UT_" + Thread.currentThread().getName()).withTransactionBatchSize(2).withRecordWriter((RecordWriter)wr).withHiveConf(conf).connect();
        String row1 = "true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo";
        String row2 = "false|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 15:59:58.174|1971-01-01|abcd|world|world|k4:v4|200,300|20,bar";
        String allRows = row1 + "\n" + row2 + "\n";
        ByteArrayInputStream bais = new ByteArrayInputStream(allRows.getBytes());
        connection.beginTransaction();
        connection.write((InputStream)bais);
        connection.commitTransaction();
        connection.close();
        bais.close();
        ArrayList<String> rs = TestStreaming.queryTable(this.driver, "select ROW__ID, bo, ti, si, i, bi, f, d, de, ts, da, s, c, vc, m, l, st, INPUT__FILE__NAME from default.alltypes order by ROW__ID");
        Assert.assertEquals((long)2L, (long)rs.size());
        String gotRow1 = (String)rs.get(0);
        String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 15:59:58.174\t1970-01-01\tstring\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}";
        String expectedSuffixRow1 = "alltypes/delta_0000001_0000002/bucket_00000";
        String gotRow2 = (String)rs.get(1);
        String expectedPrefixRow2 = "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 15:59:58.174\t1971-01-01\tabcd\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}";
        String expectedSuffixRow2 = "alltypes/delta_0000001_0000002/bucket_00000";
        Assert.assertTrue((String)gotRow1, (boolean)gotRow1.startsWith(expectedPrefixRow1));
        Assert.assertTrue((String)gotRow1, (boolean)gotRow1.endsWith(expectedSuffixRow1));
        Assert.assertTrue((String)gotRow2, (boolean)gotRow2.startsWith(expectedPrefixRow2));
        Assert.assertTrue((String)gotRow2, (boolean)gotRow2.endsWith(expectedSuffixRow2));
    }

    @Test
    public void testAutoRollTransactionBatch() throws Exception {
        TestStreaming.queryTable(this.driver, "drop table if exists default.streamingnobuckets");
        TestStreaming.queryTable(this.driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true')");
        TestStreaming.queryTable(this.driver, "insert into default.streamingnobuckets values('foo','bar')");
        ArrayList<String> rs = TestStreaming.queryTable(this.driver, "select * from default.streamingnobuckets");
        Assert.assertEquals((long)1L, (long)rs.size());
        Assert.assertEquals((Object)"foo\tbar", rs.get(0));
        StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase("default").withTable("streamingnobuckets").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)wr).withHiveConf(conf).withTransactionBatchSize(2).connect();
        connection.beginTransaction();
        connection.write("a1,b2".getBytes());
        connection.write("a3,b4".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("a5,b6".getBytes());
        connection.write("a7,b8".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("a9,b10".getBytes());
        connection.write("a11,b12".getBytes());
        connection.commitTransaction();
        connection.beginTransaction();
        connection.write("a13,b14".getBytes());
        connection.write("a15,b16".getBytes());
        connection.commitTransaction();
        connection.close();
        Assert.assertEquals((String)"", (long)0L, (long)BucketCodec.determineVersion((int)0x20000000).decodeWriterId(0x20000000));
        rs = TestStreaming.queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000_0"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(4)), (boolean)((String)rs.get(4)).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
        Assert.assertTrue((String)((String)rs.get(4)), (boolean)((String)rs.get(4)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(5)), (boolean)((String)rs.get(5)).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\ta9\tb10"));
        Assert.assertTrue((String)((String)rs.get(5)), (boolean)((String)rs.get(5)).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(6)), (boolean)((String)rs.get(6)).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12"));
        Assert.assertTrue((String)((String)rs.get(6)), (boolean)((String)rs.get(6)).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(7)), (boolean)((String)rs.get(7)).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14"));
        Assert.assertTrue((String)((String)rs.get(7)), (boolean)((String)rs.get(7)).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(8)), (boolean)((String)rs.get(8)).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\ta15\tb16"));
        Assert.assertTrue((String)((String)rs.get(8)), (boolean)((String)rs.get(8)).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
        TestStreaming.queryTable(this.driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
        TestStreaming.queryTable(this.driver, "delete from default.streamingnobuckets where a='a1'");
        TestStreaming.queryTable(this.driver, "update default.streamingnobuckets set a=0, b=0 where a='a15'");
        TestStreaming.queryTable(this.driver, "delete from default.streamingnobuckets where a='a9'");
        rs = TestStreaming.queryTable(this.driver, "select a, b from default.streamingnobuckets order by a, b");
        int row = 0;
        Assert.assertEquals((String)("at row=" + row), (Object)"0\t0", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"0\t0", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"a11\tb12", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"a13\tb14", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"a3\tb4", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"a5\tb6", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"foo\tbar", rs.get(row++));
        TestStreaming.queryTable(this.driver, "alter table default.streamingnobuckets compact 'major'");
        TestStreaming.runWorker(conf);
        rs = TestStreaming.queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(4)), (boolean)((String)rs.get(4)).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14"));
        Assert.assertTrue((String)((String)rs.get(4)), (boolean)((String)rs.get(4)).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(5)), (boolean)((String)rs.get(5)).startsWith("{\"writeid\":6,\"bucketid\":536936449,\"rowid\":0}\t0\t0"));
        Assert.assertTrue((String)((String)rs.get(5)), (boolean)((String)rs.get(5)).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00001"));
    }

    public static void runWorker(HiveConf hiveConf) throws Exception {
        AtomicBoolean stop = new AtomicBoolean(true);
        Worker t = new Worker();
        t.setConf((Configuration)hiveConf);
        t.init(stop);
        t.run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStreamBucketingMatchesRegularBucketing() throws Exception {
        int bucketCount = 100;
        String dbUri = "raw://" + new Path(this.dbFolder.newFolder().toString()).toUri().toString();
        String tableLoc = "'" + dbUri + "/streamedtable'";
        String tableLoc2 = "'" + dbUri + "/finaltable'";
        String tableLoc3 = "'" + dbUri + "/nobucket'";
        conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
        try (IDriver driver = DriverFactory.newDriver((HiveConf)conf);){
            TestStreaming.runDDL(driver, "create database testBucketing3");
            TestStreaming.runDDL(driver, "use testBucketing3");
            TestStreaming.runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into " + bucketCount + " buckets  stored as orc  location " + tableLoc + " TBLPROPERTIES ('transactional'='true')");
            TestStreaming.runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + tableLoc3);
            TestStreaming.runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into " + bucketCount + " buckets  stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')");
            String[] records = new String[]{"PSFAHYLZVC,29,EPNMA", "PPPRKWAYAU,96,VUTEE", "MIAOFERCHI,3,WBDSI", "CEGQAZOWVN,0,WCUZL", "XWAKMNSVQF,28,YJVHU", "XBWTSAJWME,2,KDQFO", "FUVLQTAXAY,5,LDSDG", "QTQMDJMGJH,6,QBOMA", "EFLOTLWJWN,71,GHWPS", "PEQNAOJHCM,82,CAAFI", "MOEKQLGZCP,41,RUACR", "QZXMCOPTID,37,LFLWE", "EYALVWICRD,13,JEZLC", "VYWLZAYTXX,16,DMVZX", "OSALYSQIXR,47,HNZVE", "JGKVHKCEGQ,25,KSCJB", "WQFMMYDHET,12,DTRWA", "AJOVAYZKZQ,15,YBKFO", "YAQONWCUAU,31,QJNHZ", "DJBXUEUOEB,35,IYCBL"};
            StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
            HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase("testBucketing3").withTable("streamedtable").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)wr).withHiveConf(conf).connect();
            connection.beginTransaction();
            for (String record : records) {
                connection.write(record.getBytes());
            }
            connection.commitTransaction();
            connection.close();
            ArrayList<String> res1 = TestStreaming.queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2");
            for (String re : res1) {
                LOG.error(re);
            }
            driver.run("insert into nobucket select row__id.bucketid,* from streamedtable");
            TestStreaming.runDDL(driver, "insert into finaltable select * from nobucket");
            ArrayList<String> res2 = TestStreaming.queryTable(driver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid");
            for (String s : res2) {
                LOG.error(s);
            }
            Assert.assertTrue((boolean)res2.isEmpty());
        }
        finally {
            conf.unset(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTableValidation() throws Exception {
        int bucketCount = 100;
        String dbUri = "raw://" + new Path(this.dbFolder.newFolder().toString()).toUri().toString();
        String tbl1 = "validation1";
        String tbl2 = "validation2";
        String tableLoc = "'" + dbUri + "/" + tbl1 + "'";
        String tableLoc2 = "'" + dbUri + "/" + tbl2 + "'";
        TestStreaming.runDDL(this.driver, "create database testBucketing3");
        TestStreaming.runDDL(this.driver, "use testBucketing3");
        TestStreaming.runDDL(this.driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into " + bucketCount + " buckets  stored as orc  location " + tableLoc + " TBLPROPERTIES ('transactional'='false')");
        TestStreaming.runDDL(this.driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into " + bucketCount + " buckets  stored as orc  location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')");
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        try (HiveStreamingConnection connection = null;){
            connection = HiveStreamingConnection.newBuilder().withDatabase("testBucketing3").withTable("validation2").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
            Assert.assertTrue((String)"InvalidTable exception was not thrown", (boolean)false);
        }
        try {
            connection = HiveStreamingConnection.newBuilder().withDatabase("testBucketing3").withTable("validation2").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
            Assert.assertTrue((String)"InvalidTable exception was not thrown", (boolean)false);
        }
        catch (InvalidTable invalidTable) {
        }
        finally {
            if (connection != null) {
                connection.close();
            }
        }
    }

    @Deprecated
    private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String ... records) throws Exception {
        ValidWriteIdList writeIds = this.getTransactionContext((Configuration)conf);
        AcidDirectory dir = AcidUtils.getAcidState(null, (Path)partitionPath, (Configuration)conf, (ValidWriteIdList)writeIds, null, (boolean)false);
        Assert.assertEquals((long)0L, (long)dir.getObsolete().size());
        Assert.assertEquals((long)0L, (long)dir.getOriginalFiles().size());
        List current = dir.getCurrentDirectories();
        System.out.println("Files found: ");
        for (AcidUtils.ParsedDelta pd : current) {
            System.out.println(pd.getPath().toString());
        }
        Assert.assertEquals((long)numExpectedFiles, (long)current.size());
        long min = Long.MAX_VALUE;
        long max = Long.MIN_VALUE;
        for (AcidUtils.ParsedDelta pd : current) {
            if (pd.getMaxWriteId() > max) {
                max = pd.getMaxWriteId();
            }
            if (pd.getMinWriteId() >= min) continue;
            min = pd.getMinWriteId();
        }
        Assert.assertEquals((long)(minTxn + 1L), (long)min);
        Assert.assertEquals((long)(maxTxn + 1L), (long)max);
        OrcInputFormat inf = new OrcInputFormat();
        JobConf job = new JobConf();
        job.set("mapred.input.dir", partitionPath.toString());
        job.set("bucket_count", Integer.toString(buckets));
        job.set("schema.evolution.columns", "id,msg");
        job.set("schema.evolution.columns.types", "bigint:string");
        AcidUtils.setAcidOperationalProperties((Configuration)job, (boolean)true, null);
        job.setBoolean("transactional", true);
        job.set("hive.txn.valid.writeids", writeIds.writeToString());
        job.set("hive.txn.valid.txns", conf.get("hive.txn.valid.txns"));
        InputSplit[] splits = inf.getSplits(job, buckets);
        Assert.assertEquals((long)numExpectedFiles, (long)splits.length);
        org.apache.hadoop.mapred.RecordReader rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
        NullWritable key = (NullWritable)rr.createKey();
        OrcStruct value = (OrcStruct)rr.createValue();
        for (String record : records) {
            Assert.assertEquals((Object)true, (Object)rr.next((Object)key, (Object)value));
            Assert.assertEquals((Object)record, (Object)value.toString());
        }
        Assert.assertEquals((Object)false, (Object)rr.next((Object)key, (Object)value));
    }

    private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles, String validationQuery, boolean vectorize, String ... records) throws Exception {
        AcidDirectory dir = AcidUtils.getAcidState(null, (Path)partitionPath, (Configuration)conf, (ValidWriteIdList)this.getTransactionContext((Configuration)conf), null, (boolean)false);
        Assert.assertEquals((long)0L, (long)dir.getObsolete().size());
        Assert.assertEquals((long)0L, (long)dir.getOriginalFiles().size());
        List current = dir.getCurrentDirectories();
        System.out.println("Files found: ");
        for (AcidUtils.ParsedDelta pd : current) {
            System.out.println(pd.getPath().toString());
        }
        Assert.assertEquals((long)numExpectedFiles, (long)current.size());
        long min = Long.MAX_VALUE;
        long max = Long.MIN_VALUE;
        for (AcidUtils.ParsedDelta pd : current) {
            if (pd.getMaxWriteId() > max) {
                max = pd.getMaxWriteId();
            }
            if (pd.getMinWriteId() >= min) continue;
            min = pd.getMinWriteId();
        }
        Assert.assertEquals((long)(minTxn + 1L), (long)min);
        Assert.assertEquals((long)(maxTxn + 1L), (long)max);
        boolean isVectorizationEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
        if (vectorize) {
            conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
        }
        String currStrategy = conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
        for (String strategy : ((Validator.StringSet)HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()).getExpected()) {
            conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, strategy.toUpperCase());
            ArrayList<String> actualResult = TestStreaming.queryTable(this.driver, validationQuery);
            for (int i = 0; i < actualResult.size(); ++i) {
                Assert.assertEquals((String)("diff at [" + i + "].  actual=" + actualResult + " expected=" + Arrays.toString(records)), (Object)records[i], actualResult.get(i));
            }
        }
        conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
        conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled);
    }

    private ValidWriteIdList getTransactionContext(Configuration conf) throws Exception {
        ValidTxnList validTxnList = this.msClient.getValidTxns();
        conf.set("hive.txn.valid.txns", validTxnList.writeToString());
        List v = this.msClient.getValidWriteIds(Collections.singletonList(TableName.getDbTable((String)dbName, (String)"alerts")), validTxnList.writeToString());
        return TxnCommonUtils.createValidReaderWriteIdList((TableValidWriteIds)((TableValidWriteIds)v.get(0)));
    }

    private void checkNothingWritten(Path partitionPath) throws Exception {
        AcidDirectory dir = AcidUtils.getAcidState(null, (Path)partitionPath, (Configuration)conf, (ValidWriteIdList)this.getTransactionContext((Configuration)conf), null, (boolean)false);
        Assert.assertEquals((long)0L, (long)dir.getObsolete().size());
        Assert.assertEquals((long)0L, (long)dir.getOriginalFiles().size());
        List current = dir.getCurrentDirectories();
        Assert.assertEquals((long)0L, (long)current.size());
    }

    @Test
    public void testEndpointConnection() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.close();
        connection = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.close();
        try {
            connection = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
            Assert.assertTrue((String)"ConnectionError was not thrown", (boolean)false);
            connection.close();
        }
        catch (ConnectionError e) {
            String errMsg = "specifies partitions for un-partitioned table";
            Assert.assertTrue((boolean)e.toString().endsWith(errMsg));
        }
    }

    @Test
    public void testAddPartition() throws Exception {
        ArrayList<String> newPartVals = new ArrayList<String>(2);
        newPartVals.add("Asia");
        newPartVals.add("Nepal");
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(newPartVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        Assert.assertNotNull((Object)connection);
        connection.beginTransaction();
        connection.write("3,Hello streaming - once again".getBytes());
        connection.commitTransaction();
        Partition p = this.msClient.getPartition(dbName, "alerts", newPartVals);
        Assert.assertNotNull((String)"Did not find added partition", (Object)p);
    }

    @Test
    public void testAddPartitionWithWriteId() throws Exception {
        ArrayList<String> newPartVals = new ArrayList<String>(2);
        newPartVals.add("WriteId_continent");
        newPartVals.add("WriteId_country");
        StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(newPartVals).withRecordWriter((RecordWriter)writerT).withHiveConf(conf).connect();
        transactionConnection.beginTransaction();
        Table tObject = transactionConnection.getTable();
        Long writeId = transactionConnection.getCurrentWriteId();
        Assert.assertNotNull((Object)tObject);
        Assert.assertNotNull((Object)writeId);
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(newPartVals).withRecordWriter((RecordWriter)writer).withHiveConf(conf).withWriteId(writeId.longValue()).withStatementId(1).withTableObject(tObject).connect();
        Assert.assertNotNull((Object)connection);
        connection.beginTransaction();
        connection.write("3,Hello streaming - once again".getBytes());
        connection.commitTransaction();
        HashSet partitions = new HashSet(connection.getPartitions());
        connection.close();
        try {
            this.msClient.getPartition(dbName, "alerts", newPartVals);
            Assert.fail((String)"Partition shouldn't exist so a NoSuchObjectException should have been raised");
        }
        catch (NoSuchObjectException noSuchObjectException) {
            // empty catch block
        }
        transactionConnection.commitTransaction(partitions);
        Partition p = this.msClient.getPartition(dbName, "alerts", newPartVals);
        Assert.assertNotNull((String)"Did not find added partition", (Object)p);
    }

    @Test
    public void testAddDynamicPartitionWithWriteId() throws Exception {
        TestStreaming.queryTable(this.driver, "drop table if exists default.writeiddynamic");
        TestStreaming.queryTable(this.driver, "create table default.writeiddynamic (a string, b string) partitioned by (c string, d string) stored as orc TBLPROPERTIES('transactional'='true')");
        StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder().withDatabase("default").withTable("writeiddynamic").withRecordWriter((RecordWriter)writerT).withHiveConf(conf).connect();
        transactionConnection.beginTransaction();
        Table tObject = transactionConnection.getTable();
        Long writeId = transactionConnection.getCurrentWriteId();
        Assert.assertNotNull((Object)tObject);
        Assert.assertNotNull((Object)writeId);
        StrictDelimitedInputWriter writerOne = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connectionOne = HiveStreamingConnection.newBuilder().withDatabase("default").withTable("writeiddynamic").withRecordWriter((RecordWriter)writerOne).withHiveConf(conf).withWriteId(writeId.longValue()).withStatementId(1).withTableObject(tObject).connect();
        StrictDelimitedInputWriter writerTwo = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connectionTwo = HiveStreamingConnection.newBuilder().withDatabase("default").withTable("writeiddynamic").withRecordWriter((RecordWriter)writerTwo).withHiveConf(conf).withWriteId(writeId.longValue()).withStatementId(1).withTableObject(tObject).connect();
        Assert.assertNotNull((Object)connectionOne);
        connectionTwo.beginTransaction();
        connectionOne.beginTransaction();
        connectionOne.write("1,2,3,4".getBytes());
        connectionOne.write("1,2,5,6".getBytes());
        connectionTwo.write("1,2,30,40".getBytes());
        connectionOne.write("1,2,7,8".getBytes());
        connectionTwo.write("1,2,50,60".getBytes());
        connectionOne.write("1,2,9,10".getBytes());
        connectionOne.commitTransaction();
        connectionTwo.commitTransaction();
        HashSet partitionsOne = new HashSet(connectionOne.getPartitions());
        Assert.assertEquals((long)4L, (long)partitionsOne.size());
        HashSet partitionsTwo = new HashSet(connectionTwo.getPartitions());
        Assert.assertEquals((long)2L, (long)partitionsTwo.size());
        connectionOne.close();
        connectionTwo.close();
        try {
            String partitionName = (String)partitionsOne.iterator().next();
            this.msClient.getPartition("default", "writeiddynamic", partitionName);
            Assert.fail((String)"Partition shouldn't exist so a NoSuchObjectException should have been raised");
        }
        catch (NoSuchObjectException partitionName) {
            // empty catch block
        }
        partitionsOne.addAll(partitionsTwo);
        HashSet allPartitions = partitionsOne;
        transactionConnection.commitTransaction(allPartitions);
        for (String partition : allPartitions) {
            Partition p = this.msClient.getPartition("default", "writeiddynamic", partition);
            Assert.assertNotNull((String)"Did not find added partition", (Object)p);
        }
    }

    @Test
    public void testTransactionBatchEmptyCommit() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.commitTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.COMMITTED, (Object)connection.getCurrentTransactionState());
        connection.close();
        writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        connection = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.commitTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.COMMITTED, (Object)connection.getCurrentTransactionState());
        connection.close();
    }

    @Test
    public void testTransactionBatchSizeValidation() throws Exception {
        String schemes = conf.get(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname);
        conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, "raw");
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        try {
            HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withTransactionBatchSize(2).withHiveConf(conf).connect();
        }
        finally {
            conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, schemes);
        }
    }

    @Test
    public void testTimeOutReaper() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        HiveConf houseKeeperConf = new HiveConf(conf);
        houseKeeperConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 100L, TimeUnit.MILLISECONDS);
        AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService();
        houseKeeperService.setConf((Configuration)houseKeeperConf);
        connection.beginTransaction();
        Thread.sleep(150L);
        houseKeeperService.run();
        try {
            connection.commitTransaction();
        }
        catch (TransactionError e) {
            Assert.assertTrue((String)"Expected aborted transaction", (boolean)(e.getCause() instanceof TxnAbortedException));
        }
        connection.close();
        connection = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.commitTransaction();
        connection.beginTransaction();
        Thread.sleep(150L);
        houseKeeperService.run();
        try {
            connection.commitTransaction();
        }
        catch (TransactionError e) {
            Assert.assertTrue((String)"Expected aborted transaction", (boolean)(e.getCause() instanceof TxnAbortedException));
        }
        connection.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeat() throws Exception {
        int transactionBatch = 20;
        conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 200L, TimeUnit.MILLISECONDS);
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withTransactionBatchSize(transactionBatch).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        try {
            connection.beginTransaction();
            ShowLocksRequest request = new ShowLocksRequest();
            request.setDbname(dbName2);
            request.setTablename("alerts");
            ShowLocksResponse response = this.msClient.showLocks(request);
            Assert.assertEquals((String)("Wrong number of locks: " + response), (long)1L, (long)response.getLocks().size());
            ShowLocksResponseElement lock = (ShowLocksResponseElement)response.getLocks().get(0);
            long acquiredAt = lock.getAcquiredat();
            long heartbeatAt = lock.getLastheartbeat();
            response = this.msClient.showLocks(request);
            Assert.assertEquals((String)("Wrong number of locks2: " + response), (long)1L, (long)response.getLocks().size());
            lock = (ShowLocksResponseElement)response.getLocks().get(0);
            Assert.assertEquals((String)"Acquired timestamp didn'table match", (long)acquiredAt, (long)lock.getAcquiredat());
            Assert.assertTrue((String)("Expected new heartbeat (" + lock.getLastheartbeat() + ") == old heartbeat(" + heartbeatAt + ")"), (lock.getLastheartbeat() == heartbeatAt ? 1 : 0) != 0);
            for (int i = 0; i < transactionBatch * 3; ++i) {
                connection.beginTransaction();
                if (i % 10 == 0) {
                    connection.abortTransaction();
                } else {
                    connection.commitTransaction();
                }
                Thread.sleep(10L);
            }
        }
        finally {
            conf.unset(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname);
            connection.close();
        }
    }

    @Test
    public void testTransactionBatchEmptyAbort() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.abortTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.ABORTED, (Object)connection.getCurrentTransactionState());
        connection.close();
        writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        connection = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.abortTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.ABORTED, (Object)connection.getCurrentTransactionState());
        connection.close();
    }

    @Test
    public void testTransactionBatchCommitDelimited() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter((RecordWriter)writer).withTransactionBatchSize(10).connect();
        connection.beginTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.OPEN, (Object)connection.getCurrentTransactionState());
        connection.write("1,Hello streaming".getBytes());
        connection.commitTransaction();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.COMMITTED, (Object)connection.getCurrentTransactionState());
        connection.beginTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.OPEN, (Object)connection.getCurrentTransactionState());
        connection.write("2,Welcome to streaming".getBytes());
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        connection.commitTransaction();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        connection.close();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.INACTIVE, (Object)connection.getCurrentTransactionState());
        writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        connection = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter((RecordWriter)writer).connect();
        connection.beginTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.OPEN, (Object)connection.getCurrentTransactionState());
        connection.write("1,Hello streaming".getBytes());
        connection.commitTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.COMMITTED, (Object)connection.getCurrentTransactionState());
        connection.close();
    }

    @Test
    public void testTransactionBatchCommitRegex() throws Exception {
        String regex = "([^,]*),(.*)";
        StrictRegexWriter writer = StrictRegexWriter.newBuilder().withRegex(regex).build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter((RecordWriter)writer).withTransactionBatchSize(10).connect();
        connection.beginTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.OPEN, (Object)connection.getCurrentTransactionState());
        connection.write("1,Hello streaming".getBytes());
        connection.commitTransaction();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.COMMITTED, (Object)connection.getCurrentTransactionState());
        connection.beginTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.OPEN, (Object)connection.getCurrentTransactionState());
        connection.write("2,Welcome to streaming".getBytes());
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        connection.commitTransaction();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        connection.close();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.INACTIVE, (Object)connection.getCurrentTransactionState());
        regex = "([^:]*):(.*)";
        writer = StrictRegexWriter.newBuilder().withRegex(regex).build();
        connection = HiveStreamingConnection.newBuilder().withDatabase(dbName2).withTable("alerts").withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter((RecordWriter)writer).connect();
        connection.beginTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.OPEN, (Object)connection.getCurrentTransactionState());
        connection.write("1:Hello streaming".getBytes());
        connection.commitTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.COMMITTED, (Object)connection.getCurrentTransactionState());
        connection.close();
    }

    @Test
    public void testRegexInputStream() throws Exception {
        String regex = "([^,]*),(.*)";
        StrictRegexWriter writer = StrictRegexWriter.newBuilder().withRegex(regex).build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter((RecordWriter)writer).connect();
        String rows = "1,foo\r2,bar\r3,baz";
        ByteArrayInputStream bais = new ByteArrayInputStream(rows.getBytes());
        connection.beginTransaction();
        connection.write((InputStream)bais);
        connection.commitTransaction();
        bais.close();
        connection.close();
        ArrayList<String> rs = TestStreaming.queryTable(this.driver, "select * from testing.alerts");
        Assert.assertEquals((long)3L, (long)rs.size());
        Assert.assertEquals((Object)"1\tfoo\tAsia\tIndia", rs.get(0));
        Assert.assertEquals((Object)"2\tbar\tAsia\tIndia", rs.get(1));
        Assert.assertEquals((Object)"3\tbaz\tAsia\tIndia", rs.get(2));
    }

    @Test
    public void testTransactionBatchCommitJson() throws Exception {
        StrictJsonWriter writer = StrictJsonWriter.newBuilder().build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connection.beginTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.OPEN, (Object)connection.getCurrentTransactionState());
        String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
        connection.write(rec1.getBytes());
        connection.commitTransaction();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.COMMITTED, (Object)connection.getCurrentTransactionState());
        connection.close();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.INACTIVE, (Object)connection.getCurrentTransactionState());
        ArrayList<String> rs = TestStreaming.queryTable(this.driver, "select * from testing.alerts");
        Assert.assertEquals((long)1L, (long)rs.size());
    }

    @Test
    public void testJsonInputStream() throws Exception {
        StrictJsonWriter writer = StrictJsonWriter.newBuilder().withLineDelimiterPattern("\\|").build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.OPEN, (Object)connection.getCurrentTransactionState());
        String records = "{\"id\" : 1, \"msg\": \"Hello streaming\"}|{\"id\" : 2, \"msg\": \"Hello world\"}|{\"id\" : 3, \"msg\": \"Hello world!!\"}";
        ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes());
        connection.write((InputStream)bais);
        connection.commitTransaction();
        bais.close();
        connection.close();
        ArrayList<String> rs = TestStreaming.queryTable(this.driver, "select * from testing.alerts");
        Assert.assertEquals((long)3L, (long)rs.size());
        Assert.assertEquals((Object)"1\tHello streaming\tAsia\tIndia", rs.get(0));
        Assert.assertEquals((Object)"2\tHello world\tAsia\tIndia", rs.get(1));
        Assert.assertEquals((Object)"3\tHello world!!\tAsia\tIndia", rs.get(2));
    }

    @Test
    public void testRemainingTransactions() throws Exception {
        int rec;
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        int batch = 0;
        int initialCount = connection.remainingTransactions();
        while (connection.remainingTransactions() > 0) {
            connection.beginTransaction();
            Assert.assertEquals((long)(--initialCount), (long)connection.remainingTransactions());
            for (rec = 0; rec < 2; ++rec) {
                Assert.assertEquals((Object)HiveStreamingConnection.TxnState.OPEN, (Object)connection.getCurrentTransactionState());
                connection.write((batch * rec + ",Hello streaming").getBytes());
            }
            connection.commitTransaction();
            Assert.assertEquals((Object)HiveStreamingConnection.TxnState.COMMITTED, (Object)connection.getCurrentTransactionState());
            ++batch;
        }
        Assert.assertEquals((long)0L, (long)connection.remainingTransactions());
        connection.close();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.INACTIVE, (Object)connection.getCurrentTransactionState());
        connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        batch = 0;
        initialCount = connection.remainingTransactions();
        while (connection.remainingTransactions() > 0) {
            connection.beginTransaction();
            Assert.assertEquals((long)(--initialCount), (long)connection.remainingTransactions());
            for (rec = 0; rec < 2; ++rec) {
                Assert.assertEquals((Object)HiveStreamingConnection.TxnState.OPEN, (Object)connection.getCurrentTransactionState());
                connection.write((batch * rec + ",Hello streaming").getBytes());
            }
            connection.abortTransaction();
            Assert.assertEquals((Object)HiveStreamingConnection.TxnState.ABORTED, (Object)connection.getCurrentTransactionState());
            ++batch;
        }
        Assert.assertEquals((long)0L, (long)connection.remainingTransactions());
        connection.close();
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.INACTIVE, (Object)connection.getCurrentTransactionState());
    }

    @Test
    public void testTransactionBatchAbort() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.write("1,Hello streaming".getBytes());
        connection.write("2,Welcome to streaming".getBytes());
        connection.abortTransaction();
        this.checkNothingWritten(partLoc);
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.ABORTED, (Object)connection.getCurrentTransactionState());
        connection.close();
        this.checkNothingWritten(partLoc);
    }

    @Test(expected=ClassCastException.class)
    public void testFileSystemError() throws Exception {
        conf.set("fs.raw.impl", Object.class.getName());
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
    }

    @Test
    public void testTransactionBatchAbortAndCommit() throws Exception {
        String agentInfo = "UT_" + Thread.currentThread().getName();
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo(agentInfo).withRecordWriter((RecordWriter)writer).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connection.beginTransaction();
        connection.write("1,Hello streaming".getBytes());
        connection.write("2,Welcome to streaming".getBytes());
        ShowLocksResponse resp = this.msClient.showLocks(new ShowLocksRequest());
        Assert.assertEquals((String)"LockCount", (long)1L, (long)resp.getLocksSize());
        Assert.assertEquals((String)"LockType", (Object)LockType.SHARED_READ, (Object)((ShowLocksResponseElement)resp.getLocks().get(0)).getType());
        Assert.assertEquals((String)"LockState", (Object)LockState.ACQUIRED, (Object)((ShowLocksResponseElement)resp.getLocks().get(0)).getState());
        Assert.assertEquals((String)"AgentInfo", (Object)agentInfo, (Object)((ShowLocksResponseElement)resp.getLocks().get(0)).getAgentInfo());
        connection.abortTransaction();
        this.checkNothingWritten(partLoc);
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.ABORTED, (Object)connection.getCurrentTransactionState());
        connection.beginTransaction();
        connection.write("1,Hello streaming".getBytes());
        connection.write("2,Welcome to streaming".getBytes());
        connection.commitTransaction();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        connection.close();
    }

    @Test
    public void testMultipleTransactionBatchCommits() throws Exception {
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withTransactionBatchSize(10).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.write("1,Hello streaming".getBytes());
        connection.commitTransaction();
        String validationQuery = "select id, msg from testing.alerts order by id, msg";
        this.checkDataWritten2(partLoc, 1L, 10L, 1, validationQuery, false, "1\tHello streaming");
        connection.beginTransaction();
        connection.write("2,Welcome to streaming".getBytes());
        connection.commitTransaction();
        this.checkDataWritten2(partLoc, 1L, 10L, 1, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming");
        connection.close();
        connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withTransactionBatchSize(10).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.write("3,Hello streaming - once again".getBytes());
        connection.commitTransaction();
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, false, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again");
        connection.beginTransaction();
        connection.write("4,Welcome to streaming - once again".getBytes());
        connection.commitTransaction();
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again");
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.COMMITTED, (Object)connection.getCurrentTransactionState());
        connection.close();
    }

    @Test
    public void testInterleavedTransactionBatchCommits() throws Exception {
        long actualLength;
        long logicalLength;
        long lengthFileSize;
        Path lengthFile;
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connection.beginTransaction();
        StrictDelimitedInputWriter writer2 = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection2 = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer2).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connection2.beginTransaction();
        connection.write("1,Hello streaming".getBytes());
        connection2.write("3,Hello streaming - once again".getBytes());
        this.checkNothingWritten(partLoc);
        connection2.commitTransaction();
        String validationQuery = "select id, msg from testing.alerts order by id, msg";
        this.checkDataWritten2(partLoc, 11L, 20L, 1, validationQuery, true, "3\tHello streaming - once again");
        connection.commitTransaction();
        FileSystem fs = partLoc.getFileSystem((Configuration)conf);
        AcidDirectory dir = AcidUtils.getAcidState((FileSystem)fs, (Path)partLoc, (Configuration)conf, (ValidWriteIdList)this.getTransactionContext((Configuration)conf), null, (boolean)false);
        for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
            for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
                lengthFile = OrcAcidUtils.getSideFile((Path)stat.getPath());
                Assert.assertTrue((String)(lengthFile + " missing"), (boolean)fs.exists(lengthFile));
                lengthFileSize = fs.getFileStatus(lengthFile).getLen();
                Assert.assertTrue((String)("Expected " + lengthFile + " to be non empty. lengh=" + lengthFileSize), (lengthFileSize > 0L ? 1 : 0) != 0);
                logicalLength = AcidUtils.getLogicalLength((FileSystem)fs, (FileStatus)stat);
                actualLength = stat.getLen();
                Assert.assertTrue((String)"", (logicalLength == actualLength ? 1 : 0) != 0);
            }
        }
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, false, "1\tHello streaming", "3\tHello streaming - once again");
        connection.beginTransaction();
        connection.write("2,Welcome to streaming".getBytes());
        connection2.beginTransaction();
        connection2.write("4,Welcome to streaming - once again".getBytes());
        dir = AcidUtils.getAcidState((FileSystem)fs, (Path)partLoc, (Configuration)conf, (ValidWriteIdList)this.getTransactionContext((Configuration)conf), null, (boolean)false);
        for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
            for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
                lengthFile = OrcAcidUtils.getSideFile((Path)stat.getPath());
                Assert.assertTrue((String)(lengthFile + " missing"), (boolean)fs.exists(lengthFile));
                lengthFileSize = fs.getFileStatus(lengthFile).getLen();
                Assert.assertTrue((String)("Expected " + lengthFile + " to be non empty. lengh=" + lengthFileSize), (lengthFileSize > 0L ? 1 : 0) != 0);
                logicalLength = AcidUtils.getLogicalLength((FileSystem)fs, (FileStatus)stat);
                actualLength = stat.getLen();
                Assert.assertTrue((String)"", (logicalLength <= actualLength ? 1 : 0) != 0);
            }
        }
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, true, "1\tHello streaming", "3\tHello streaming - once again");
        connection.commitTransaction();
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, false, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again");
        connection2.commitTransaction();
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again");
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.COMMITTED, (Object)connection.getCurrentTransactionState());
        Assert.assertEquals((Object)HiveStreamingConnection.TxnState.COMMITTED, (Object)connection2.getCurrentTransactionState());
        connection.close();
        connection2.close();
    }

    @Test
    public void testConcurrentTransactionBatchCommits() throws Exception {
        ArrayList<WriterThd> writers = new ArrayList<WriterThd>(3);
        writers.add(new WriterThd("1,Matrix"));
        writers.add(new WriterThd("2,Gandhi"));
        writers.add(new WriterThd("3,Silence"));
        for (WriterThd w : writers) {
            w.start();
        }
        for (WriterThd w : writers) {
            w.join();
        }
        for (WriterThd w : writers) {
            if (w.error == null) continue;
            Assert.assertFalse((String)("Writer thread" + w.getName() + " died: " + w.error.getMessage() + " See log file for stack trace"), (boolean)true);
        }
    }

    private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException {
        LocalFileSystem fs = FileSystem.getLocal((Configuration)new Configuration());
        ArrayList<SampleRec> result = new ArrayList<SampleRec>();
        try (Reader reader = OrcFile.createReader((Path)orcFile, (OrcFile.ReaderOptions)OrcFile.readerOptions((Configuration)conf).filesystem((FileSystem)fs));){
            RecordReader rows = reader.rows();
            StructObjectInspector inspector = (StructObjectInspector)reader.getObjectInspector();
            System.out.format("Found Bucket File : %s \n", orcFile.getName());
            while (rows.hasNext()) {
                Object row = rows.next(null);
                SampleRec rec = (SampleRec)TestStreaming.deserializeDeltaFileRow(row, inspector)[5];
                result.add(rec);
            }
        }
        return result;
    }

    private static Object[] deserializeDeltaFileRow(Object row, StructObjectInspector inspector) {
        List fields = inspector.getAllStructFieldRefs();
        WritableIntObjectInspector f0ins = (WritableIntObjectInspector)((StructField)fields.get(0)).getFieldObjectInspector();
        WritableLongObjectInspector f1ins = (WritableLongObjectInspector)((StructField)fields.get(1)).getFieldObjectInspector();
        WritableIntObjectInspector f2ins = (WritableIntObjectInspector)((StructField)fields.get(2)).getFieldObjectInspector();
        WritableLongObjectInspector f3ins = (WritableLongObjectInspector)((StructField)fields.get(3)).getFieldObjectInspector();
        WritableLongObjectInspector f4ins = (WritableLongObjectInspector)((StructField)fields.get(4)).getFieldObjectInspector();
        StructObjectInspector f5ins = (StructObjectInspector)((StructField)fields.get(5)).getFieldObjectInspector();
        int f0 = f0ins.get(inspector.getStructFieldData(row, (StructField)fields.get(0)));
        long f1 = f1ins.get(inspector.getStructFieldData(row, (StructField)fields.get(1)));
        int f2 = f2ins.get(inspector.getStructFieldData(row, (StructField)fields.get(2)));
        long f3 = f3ins.get(inspector.getStructFieldData(row, (StructField)fields.get(3)));
        long f4 = f4ins.get(inspector.getStructFieldData(row, (StructField)fields.get(4)));
        SampleRec f5 = TestStreaming.deserializeInner(inspector.getStructFieldData(row, (StructField)fields.get(5)), f5ins);
        return new Object[]{f0, f1, f2, f3, f4, f5};
    }

    private static SampleRec deserializeInner(Object row, StructObjectInspector inspector) {
        List fields = inspector.getAllStructFieldRefs();
        WritableStringObjectInspector f0ins = (WritableStringObjectInspector)((StructField)fields.get(0)).getFieldObjectInspector();
        WritableIntObjectInspector f1ins = (WritableIntObjectInspector)((StructField)fields.get(1)).getFieldObjectInspector();
        WritableStringObjectInspector f2ins = (WritableStringObjectInspector)((StructField)fields.get(2)).getFieldObjectInspector();
        String f0 = f0ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, (StructField)fields.get(0)));
        int f1 = f1ins.get(inspector.getStructFieldData(row, (StructField)fields.get(1)));
        String f2 = f2ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, (StructField)fields.get(2)));
        return new SampleRec(f0, f1, f2);
    }

    @Test
    public void testBucketing() throws Exception {
        String agentInfo = "UT_" + Thread.currentThread().getName();
        TestStreaming.dropDB(this.msClient, dbName3);
        TestStreaming.dropDB(this.msClient, dbName4);
        Object dbLocation = this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
        dbLocation = ((String)dbLocation).replaceAll("\\\\", "/");
        String[] colNames = "key1,key2,data".split(",");
        String[] colTypes = "string,int,string".split(",");
        String[] bucketNames = "key1,key2".split(",");
        int bucketCount = 4;
        TestStreaming.createDbAndTable(this.driver, dbName3, tblName3, null, colNames, colTypes, bucketNames, null, (String)dbLocation, bucketCount);
        Object dbLocation2 = this.dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
        dbLocation2 = ((String)dbLocation2).replaceAll("\\\\", "/");
        String[] colNames2 = "key3,key4,data2".split(",");
        String[] colTypes2 = "string,int,string".split(",");
        String[] bucketNames2 = "key3,key4".split(",");
        TestStreaming.createDbAndTable(this.driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2, null, (String)dbLocation2, bucketCount);
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo(agentInfo).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.write("name0,1,Hello streaming".getBytes());
        connection.write("name2,2,Welcome to streaming".getBytes());
        connection.write("name4,2,more Streaming unlimited".getBytes());
        connection.write("name5,2,even more Streaming unlimited".getBytes());
        connection.commitTransaction();
        connection.close();
        StrictDelimitedInputWriter writer2 = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection2 = HiveStreamingConnection.newBuilder().withDatabase(dbName4).withTable(tblName4).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer2).withHiveConf(conf).connect();
        connection2.beginTransaction();
        connection2.write("name5,2,fact3".getBytes());
        connection2.write("name8,2,fact3".getBytes());
        connection2.write("name0,1,fact1".getBytes());
        connection2.commitTransaction();
        connection2.close();
        HashMap<Integer, ArrayList<SampleRec>> actual1 = this.dumpAllBuckets((String)dbLocation, tblName3);
        HashMap<Integer, ArrayList<SampleRec>> actual2 = this.dumpAllBuckets((String)dbLocation2, tblName4);
        System.err.println("\n  Table 1");
        System.err.println(actual1);
        System.err.println("\n  Table 2");
        System.err.println(actual2);
        Assert.assertEquals((String)"number of buckets does not match expectation", (long)actual1.values().size(), (long)3L);
        Assert.assertTrue((String)"bucket 0 shouldn't have been created", (actual1.get(0) == null ? 1 : 0) != 0);
        Assert.assertEquals((String)"records in bucket does not match expectation", (long)actual1.get(1).size(), (long)1L);
        Assert.assertEquals((String)"records in bucket does not match expectation", (long)actual1.get(2).size(), (long)2L);
        Assert.assertEquals((String)"records in bucket does not match expectation", (long)actual1.get(3).size(), (long)1L);
    }

    private void runCmdOnDriver(String cmd) {
        boolean t = TestStreaming.runDDL(this.driver, cmd);
        Assert.assertTrue((String)(cmd + " failed"), (boolean)t);
    }

    @Test
    public void testFileDump() throws Exception {
        String agentInfo = "UT_" + Thread.currentThread().getName();
        TestStreaming.dropDB(this.msClient, dbName3);
        TestStreaming.dropDB(this.msClient, dbName4);
        Object dbLocation = this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
        dbLocation = ((String)dbLocation).replaceAll("\\\\", "/");
        String[] colNames = "key1,key2,data".split(",");
        String[] colTypes = "string,int,string".split(",");
        String[] bucketNames = "key1,key2".split(",");
        int bucketCount = 4;
        TestStreaming.createDbAndTable(this.driver, dbName3, tblName3, null, colNames, colTypes, bucketNames, null, (String)dbLocation, bucketCount);
        Object dbLocation2 = this.dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
        dbLocation2 = ((String)dbLocation2).replaceAll("\\\\", "/");
        String[] colNames2 = "key3,key4,data2".split(",");
        String[] colTypes2 = "string,int,string".split(",");
        String[] bucketNames2 = "key3,key4".split(",");
        TestStreaming.createDbAndTable(this.driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2, null, (String)dbLocation2, bucketCount);
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo(agentInfo).withHiveConf(conf).withRecordWriter((RecordWriter)writer).connect();
        connection.beginTransaction();
        connection.write("name0,1,Hello streaming".getBytes());
        connection.write("name2,2,Welcome to streaming".getBytes());
        connection.write("name4,2,more Streaming unlimited".getBytes());
        connection.write("name5,2,even more Streaming unlimited".getBytes());
        connection.commitTransaction();
        connection.close();
        PrintStream origErr = System.err;
        ByteArrayOutputStream myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.err.flush();
        System.setErr(origErr);
        String errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)false, (Object)errDump.contains("file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        StrictDelimitedInputWriter writer2 = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection2 = HiveStreamingConnection.newBuilder().withDatabase(dbName4).withTable(tblName4).withAgentInfo(agentInfo).withRecordWriter((RecordWriter)writer2).withHiveConf(conf).connect();
        connection2.beginTransaction();
        connection2.write("name5,2,fact3".getBytes());
        connection2.write("name8,2,fact3".getBytes());
        connection2.write("name0,1,fact1".getBytes());
        connection2.commitTransaction();
        connection2.close();
        origErr = System.err;
        myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.out.flush();
        System.err.flush();
        System.setErr(origErr);
        errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
    }

    @Test
    public void testFileDumpDeltaFilesWithStreamingOptimizations() throws Exception {
        String agentInfo = "UT_" + Thread.currentThread().getName();
        TestStreaming.dropDB(this.msClient, dbName3);
        TestStreaming.dropDB(this.msClient, dbName4);
        Object dbLocation = this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
        dbLocation = ((String)dbLocation).replaceAll("\\\\", "/");
        String[] colNames = "key1,key2,data".split(",");
        String[] colTypes = "string,int,string".split(",");
        String[] bucketNames = "key1,key2".split(",");
        int bucketCount = 4;
        TestStreaming.createDbAndTable(this.driver, dbName3, tblName3, null, colNames, colTypes, bucketNames, null, (String)dbLocation, bucketCount);
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo(agentInfo).withHiveConf(conf).withRecordWriter((RecordWriter)writer).withStreamingOptimizations(true).connect();
        connection.beginTransaction();
        connection.write("name0,1,streaming".getBytes());
        connection.write("name2,2,streaming".getBytes());
        connection.write("name4,2,unlimited".getBytes());
        connection.write("name5,2,unlimited".getBytes());
        for (int i = 0; i < 6000; ++i) {
            if (i % 2 == 0) {
                connection.write(("name" + i + "," + i + ",streaming").getBytes());
                continue;
            }
            connection.write(("name" + i + "," + i + ",unlimited").getBytes());
        }
        connection.commitTransaction();
        connection.close();
        connection.close();
        PrintStream origOut = System.out;
        ByteArrayOutputStream myOut = new ByteArrayOutputStream();
        System.setOut(new PrintStream(myOut));
        FileDump.main((String[])new String[]{dbLocation});
        System.out.flush();
        System.setOut(origOut);
        String outDump = new String(myOut.toByteArray());
        Assert.assertEquals((Object)true, (Object)outDump.contains("Compression: ZLIB"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Column 0: count: 0 hasNull: false"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Column 1: count: 0 hasNull: false bytesOnDisk: 15 sum: 0"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Column 2: count: 0 hasNull: false bytesOnDisk: 15 sum: 0"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Column 3: count: 0 hasNull: false bytesOnDisk: 19 sum: 0"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Column 4: count: 0 hasNull: false bytesOnDisk: 17 sum: 0"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Column 5: count: 0 hasNull: false bytesOnDisk: 15 sum: 0"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Column 6: count: 0 hasNull: false"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Column 7: count: 0 hasNull: false bytesOnDisk: 3929"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Column 8: count: 0 hasNull: false bytesOnDisk: 1484 sum: 0"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Column 9: count: 0 hasNull: false bytesOnDisk: 816"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Encoding column 7: DIRECT_V2"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Encoding column 9: DIRECT_V2"));
    }

    @Test
    public void testFileDumpDeltaFilesWithoutStreamingOptimizations() throws Exception {
        String agentInfo = "UT_" + Thread.currentThread().getName();
        TestStreaming.dropDB(this.msClient, dbName3);
        TestStreaming.dropDB(this.msClient, dbName4);
        Object dbLocation = this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
        dbLocation = ((String)dbLocation).replaceAll("\\\\", "/");
        String[] colNames = "key1,key2,data".split(",");
        String[] colTypes = "string,int,string".split(",");
        String[] bucketNames = "key1,key2".split(",");
        int bucketCount = 4;
        TestStreaming.createDbAndTable(this.driver, dbName3, tblName3, null, colNames, colTypes, bucketNames, null, (String)dbLocation, bucketCount);
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo(agentInfo).withHiveConf(conf).withRecordWriter((RecordWriter)writer).withStreamingOptimizations(false).connect();
        connection.beginTransaction();
        connection.write("name0,1,streaming".getBytes());
        connection.write("name2,2,streaming".getBytes());
        connection.write("name4,2,unlimited".getBytes());
        connection.write("name5,2,unlimited".getBytes());
        for (int i = 0; i < 6000; ++i) {
            if (i % 2 == 0) {
                connection.write(("name" + i + "," + i + ",streaming").getBytes());
                continue;
            }
            connection.write(("name" + i + "," + i + ",unlimited").getBytes());
        }
        connection.commitTransaction();
        connection.close();
        PrintStream origOut = System.out;
        ByteArrayOutputStream myOut = new ByteArrayOutputStream();
        System.setOut(new PrintStream(myOut));
        FileDump.main((String[])new String[]{dbLocation});
        System.out.flush();
        System.setOut(origOut);
        String outDump = new String(myOut.toByteArray());
        Assert.assertEquals((Object)true, (Object)outDump.contains("Compression: ZLIB"));
        Assert.assertEquals((Object)true, (Object)outDump.contains("Encoding column 9: DICTIONARY"));
    }

    @Test
    public void testFileDumpCorruptDataFiles() throws Exception {
        TestStreaming.dropDB(this.msClient, dbName3);
        Object dbLocation = this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
        dbLocation = ((String)dbLocation).replaceAll("\\\\", "/");
        String[] colNames = "key1,key2,data".split(",");
        String[] colTypes = "string,int,string".split(",");
        String[] bucketNames = "key1,key2".split(",");
        int bucketCount = 4;
        TestStreaming.createDbAndTable(this.driver, dbName3, tblName3, null, colNames, colTypes, bucketNames, null, (String)dbLocation, bucketCount);
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connection.beginTransaction();
        connection.write("name0,1,Hello streaming".getBytes());
        connection.write("name2,2,Welcome to streaming".getBytes());
        connection.write("name4,2,more Streaming unlimited".getBytes());
        connection.write("name5,2,even more Streaming unlimited".getBytes());
        connection.commitTransaction();
        Path path = new Path((String)dbLocation);
        Collection files = FileDump.getAllFilesInPath((Path)path, (Configuration)conf);
        for (String file : files) {
            if (file.contains("bucket_00000")) {
                this.corruptDataFile(file, (Configuration)conf, Integer.MIN_VALUE);
                continue;
            }
            if (file.contains("bucket_00001")) {
                this.corruptDataFile(file, (Configuration)conf, -1);
                continue;
            }
            if (file.contains("bucket_00002")) {
                this.corruptDataFile(file, (Configuration)conf, 100);
                continue;
            }
            if (!file.contains("bucket_00003")) continue;
            this.corruptDataFile(file, (Configuration)conf, 100);
        }
        PrintStream origErr = System.err;
        ByteArrayOutputStream myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.err.flush();
        System.setErr(origErr);
        String errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("3 file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        origErr = System.err;
        myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation, "--recover", "--skip-dump"});
        System.err.flush();
        System.setErr(origErr);
        errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00001 recovered successfully!"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("No readable footers found. Creating empty orc file."));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00002 recovered successfully!"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00003 recovered successfully!"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        origErr = System.err;
        myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.err.flush();
        System.setErr(origErr);
        errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        files = FileDump.getAllFilesInPath((Path)path, (Configuration)conf);
        for (String file : files) {
            Assert.assertEquals((Object)false, (Object)file.contains("_flush_length"));
        }
        connection.close();
    }

    private void corruptDataFile(String file, Configuration conf, int addRemoveBytes) throws Exception {
        Path bPath = new Path(file);
        Path cPath = new Path(bPath.getParent(), bPath.getName() + ".corrupt");
        FileSystem fs = bPath.getFileSystem(conf);
        FileStatus fileStatus = fs.getFileStatus(bPath);
        int len = addRemoveBytes == Integer.MIN_VALUE ? 0 : (int)fileStatus.getLen() + addRemoveBytes;
        byte[] buffer = new byte[len];
        FSDataInputStream fdis = fs.open(bPath);
        fdis.readFully(0L, buffer, 0, (int)Math.min(fileStatus.getLen(), (long)buffer.length));
        fdis.close();
        FSDataOutputStream fdos = fs.create(cPath, true);
        fdos.write(buffer, 0, buffer.length);
        fdos.close();
        fs.delete(bPath, false);
        fs.rename(cPath, bPath);
    }

    @Test
    public void testFileDumpCorruptSideFiles() throws Exception {
        TestStreaming.dropDB(this.msClient, dbName3);
        Object dbLocation = this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
        dbLocation = ((String)dbLocation).replaceAll("\\\\", "/");
        String[] colNames = "key1,key2,data".split(",");
        String[] colTypes = "string,int,string".split(",");
        String[] bucketNames = "key1,key2".split(",");
        int bucketCount = 4;
        TestStreaming.createDbAndTable(this.driver, dbName3, tblName3, null, colNames, colTypes, bucketNames, null, (String)dbLocation, bucketCount);
        StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName3).withTable(tblName3).withAgentInfo("UT_" + Thread.currentThread().getName()).withRecordWriter((RecordWriter)writer).withHiveConf(conf).withTransactionBatchSize(10).connect();
        connection.beginTransaction();
        connection.write("name0,1,Hello streaming".getBytes());
        connection.write("name2,2,Welcome to streaming".getBytes());
        connection.write("name4,2,more Streaming unlimited".getBytes());
        connection.write("name5,2,even more Streaming unlimited".getBytes());
        connection.write("name6,3,aHello streaming".getBytes());
        connection.commitTransaction();
        HashMap<String, List<Long>> offsetMap = new HashMap<String, List<Long>>();
        this.recordOffsets(conf, (String)dbLocation, offsetMap);
        connection.beginTransaction();
        connection.write("name01,11,-Hello streaming".getBytes());
        connection.write("name21,21,-Welcome to streaming".getBytes());
        connection.write("name41,21,-more Streaming unlimited".getBytes());
        connection.write("name51,21,-even more Streaming unlimited".getBytes());
        connection.write("name02,12,--Hello streaming".getBytes());
        connection.write("name22,22,--Welcome to streaming".getBytes());
        connection.write("name42,22,--more Streaming unlimited".getBytes());
        connection.write("name52,22,--even more Streaming unlimited".getBytes());
        connection.write("name7,4,aWelcome to streaming".getBytes());
        connection.write("name8,5,amore Streaming unlimited".getBytes());
        connection.write("name9,6,aeven more Streaming unlimited".getBytes());
        connection.write("name10,7,bHello streaming".getBytes());
        connection.write("name11,8,bWelcome to streaming".getBytes());
        connection.write("name12,9,bmore Streaming unlimited".getBytes());
        connection.write("name13,10,beven more Streaming unlimited".getBytes());
        connection.commitTransaction();
        this.recordOffsets(conf, (String)dbLocation, offsetMap);
        Path path = new Path((String)dbLocation);
        Collection files = FileDump.getAllFilesInPath((Path)path, (Configuration)conf);
        for (String file : files) {
            if (file.contains("bucket_00000")) {
                this.corruptSideFile(file, conf, offsetMap, "bucket_00000", -1);
                continue;
            }
            if (file.contains("bucket_00001")) {
                this.corruptSideFile(file, conf, offsetMap, "bucket_00001", 0);
                continue;
            }
            if (file.contains("bucket_00002")) {
                this.corruptSideFile(file, conf, offsetMap, "bucket_00002", 3);
                continue;
            }
            if (!file.contains("bucket_00003")) continue;
            this.corruptSideFile(file, conf, offsetMap, "bucket_00003", 10);
        }
        PrintStream origErr = System.err;
        ByteArrayOutputStream myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.err.flush();
        System.setErr(origErr);
        String errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00000_flush_length [length: 11"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00001_flush_length [length: 0"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00002_flush_length [length: 24"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00003_flush_length [length: 80"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("4 file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        origErr = System.err;
        myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation, "--recover", "--skip-dump"});
        System.err.flush();
        System.setErr(origErr);
        errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00000 recovered successfully!"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00001 recovered successfully!"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00002 recovered successfully!"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00003 recovered successfully!"));
        List offsets = (List)offsetMap.get("bucket_00000");
        Assert.assertEquals((Object)true, (Object)errDump.contains("Readable footerOffsets: " + offsets.toString()));
        offsets = (List)offsetMap.get("bucket_00001");
        Assert.assertEquals((Object)true, (Object)errDump.contains("Readable footerOffsets: " + offsets.toString()));
        offsets = (List)offsetMap.get("bucket_00002");
        Assert.assertEquals((Object)true, (Object)errDump.contains("Readable footerOffsets: " + offsets.toString()));
        offsets = (List)offsetMap.get("bucket_00003");
        Assert.assertEquals((Object)true, (Object)errDump.contains("Readable footerOffsets: " + offsets.toString()));
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        origErr = System.err;
        myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.err.flush();
        System.setErr(origErr);
        errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        files = FileDump.getAllFilesInPath((Path)path, (Configuration)conf);
        for (String file : files) {
            Assert.assertEquals((Object)false, (Object)file.contains("_flush_length"));
        }
        connection.close();
    }

    private void corruptSideFile(String file, HiveConf conf, Map<String, List<Long>> offsetMap, String key, int numEntries) throws IOException {
        Path dataPath = new Path(file);
        Path sideFilePath = OrcAcidUtils.getSideFile((Path)dataPath);
        Path cPath = new Path(sideFilePath.getParent(), sideFilePath.getName() + ".corrupt");
        FileSystem fs = sideFilePath.getFileSystem((Configuration)conf);
        List<Long> offsets = offsetMap.get(key);
        long lastOffset = offsets.get(offsets.size() - 1);
        FSDataOutputStream fdos = fs.create(cPath, true);
        if (numEntries < 0) {
            byte[] lastOffsetBytes = this.longToBytes(lastOffset);
            for (int i = 0; i < offsets.size() - 1; ++i) {
                fdos.writeLong(offsets.get(i).longValue());
            }
            fdos.write(lastOffsetBytes, 0, 3);
        } else if (numEntries > 0) {
            int firstRun = Math.min(offsets.size(), numEntries);
            for (int i = 0; i < firstRun; ++i) {
                fdos.writeLong(offsets.get(i).longValue());
            }
            int remaining = numEntries - firstRun;
            for (int i = 0; i < remaining; ++i) {
                fdos.writeLong(lastOffset + (long)((i + 1) * 100));
            }
        }
        fdos.close();
        fs.delete(sideFilePath, false);
        fs.rename(cPath, sideFilePath);
    }

    private byte[] longToBytes(long x) {
        ByteBuffer buffer = ByteBuffer.allocate(8);
        buffer.putLong(x);
        return buffer.array();
    }

    private void recordOffsets(HiveConf conf, String dbLocation, Map<String, List<Long>> offsetMap) throws IOException {
        Path path = new Path(dbLocation);
        Collection files = FileDump.getAllFilesInPath((Path)path, (Configuration)conf);
        for (String file : files) {
            List<Object> offsets;
            Path bPath = new Path(file);
            FileSystem fs = bPath.getFileSystem((Configuration)conf);
            FileStatus fileStatus = fs.getFileStatus(bPath);
            long len = fileStatus.getLen();
            if (file.contains("bucket_00000")) {
                if (offsetMap.containsKey("bucket_00000")) {
                    offsets = offsetMap.get("bucket_00000");
                    offsets.add(len);
                    offsetMap.put("bucket_00000", offsets);
                    continue;
                }
                offsets = new ArrayList();
                offsets.add(len);
                offsetMap.put("bucket_00000", offsets);
                continue;
            }
            if (file.contains("bucket_00001")) {
                if (offsetMap.containsKey("bucket_00001")) {
                    offsets = offsetMap.get("bucket_00001");
                    offsets.add(len);
                    offsetMap.put("bucket_00001", offsets);
                    continue;
                }
                offsets = new ArrayList();
                offsets.add(len);
                offsetMap.put("bucket_00001", offsets);
                continue;
            }
            if (file.contains("bucket_00002")) {
                if (offsetMap.containsKey("bucket_00002")) {
                    offsets = offsetMap.get("bucket_00002");
                    offsets.add(len);
                    offsetMap.put("bucket_00002", offsets);
                    continue;
                }
                offsets = new ArrayList();
                offsets.add(len);
                offsetMap.put("bucket_00002", offsets);
                continue;
            }
            if (!file.contains("bucket_00003")) continue;
            if (offsetMap.containsKey("bucket_00003")) {
                offsets = offsetMap.get("bucket_00003");
                offsets.add(len);
                offsetMap.put("bucket_00003", offsets);
                continue;
            }
            offsets = new ArrayList();
            offsets.add(len);
            offsetMap.put("bucket_00003", offsets);
        }
    }

    @Test
    public void testErrorHandling() throws Exception {
        String agentInfo = "UT_" + Thread.currentThread().getName();
        this.runCmdOnDriver("create database testErrors");
        this.runCmdOnDriver("use testErrors");
        this.runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
        StrictDelimitedInputWriter innerWriter = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
        HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase("testErrors").withTable("T").withAgentInfo(agentInfo).withTransactionBatchSize(2).withRecordWriter((RecordWriter)innerWriter).withHiveConf(conf).connect();
        connection.beginTransaction();
        FaultyWriter writer = new FaultyWriter((RecordWriter)innerWriter);
        connection.close();
        Throwable expectedEx = null;
        GetOpenTxnsInfoResponse r = this.msClient.showTxns();
        Assert.assertEquals((String)"HWM didn'table match", (long)17L, (long)r.getTxn_high_water_mark());
        List ti = r.getOpen_txns();
        Assert.assertEquals((String)"wrong status ti(0)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(0)).getState());
        Assert.assertEquals((String)"wrong status ti(1)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(1)).getState());
        try {
            connection.beginTransaction();
        }
        catch (StreamingException ex) {
            expectedEx = ex;
        }
        Assert.assertTrue((String)"beginTransaction() should have failed", (expectedEx != null && expectedEx.getMessage().contains("Streaming connection is closed already.") ? 1 : 0) != 0);
        connection = HiveStreamingConnection.newBuilder().withDatabase("testErrors").withTable("T").withAgentInfo(agentInfo).withTransactionBatchSize(2).withRecordWriter((RecordWriter)innerWriter).withHiveConf(conf).connect();
        expectedEx = null;
        try {
            connection.write("name0,1,Hello streaming".getBytes());
        }
        catch (StreamingException ex) {
            expectedEx = ex;
        }
        Assert.assertTrue((String)"write() should have failed", (expectedEx != null && expectedEx.getMessage().equals("Transaction batch is null. Missing beginTransaction?") ? 1 : 0) != 0);
        expectedEx = null;
        try {
            connection.commitTransaction();
        }
        catch (StreamingException ex) {
            expectedEx = ex;
        }
        Assert.assertTrue((String)"commitTransaction() should have failed", (expectedEx != null && expectedEx.getMessage().equals("Transaction batch is null. Missing beginTransaction?") ? 1 : 0) != 0);
        connection = HiveStreamingConnection.newBuilder().withDatabase("testErrors").withTable("T").withAgentInfo(agentInfo).withTransactionBatchSize(2).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.write("name2,2,Welcome to streaming".getBytes());
        connection.write("name4,2,more Streaming unlimited".getBytes());
        connection.write("name5,2,even more Streaming unlimited".getBytes());
        connection.commitTransaction();
        String s = connection.toTransactionString();
        Assert.assertTrue((String)("Actual: " + s), (boolean)s.contains("LastUsed " + JavaUtils.txnIdToString((long)connection.getCurrentTxnId())));
        Assert.assertTrue((String)("Actual: " + s), (boolean)s.contains("TxnStatus[CO]"));
        expectedEx = null;
        connection.beginTransaction();
        writer.enableErrors();
        try {
            connection.write("name6,2,Doh!".getBytes());
        }
        catch (StreamingIOFailure ex) {
            expectedEx = ex;
        }
        Assert.assertTrue((String)("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?")), (expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred") ? 1 : 0) != 0);
        expectedEx = null;
        try {
            connection.commitTransaction();
        }
        catch (StreamingException ex) {
            expectedEx = ex;
        }
        Assert.assertTrue((String)"commitTransaction() should have failed", (expectedEx != null && expectedEx.getMessage().equals("Transaction state is not OPEN. Missing beginTransaction?") ? 1 : 0) != 0);
        s = connection.toTransactionString();
        Assert.assertTrue((String)("Actual: " + s), (boolean)s.contains("LastUsed " + JavaUtils.txnIdToString((long)connection.getCurrentTxnId())));
        Assert.assertTrue((String)("Actual: " + s), (boolean)s.contains("TxnStatus[CA]"));
        r = this.msClient.showTxns();
        Assert.assertEquals((String)"HWM didn't match", (long)19L, (long)r.getTxn_high_water_mark());
        ti = r.getOpen_txns();
        Assert.assertEquals((String)"wrong status ti(0)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(0)).getState());
        Assert.assertEquals((String)"wrong status ti(1)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(1)).getState());
        Assert.assertEquals((String)"wrong status ti(2)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(2)).getState());
        connection.close();
        writer.disableErrors();
        connection = HiveStreamingConnection.newBuilder().withDatabase("testErrors").withTable("T").withAgentInfo(agentInfo).withTransactionBatchSize(2).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
        connection.beginTransaction();
        connection.write("name2,2,Welcome to streaming".getBytes());
        writer.enableErrors();
        expectedEx = null;
        try {
            connection.commitTransaction();
        }
        catch (StreamingIOFailure ex) {
            expectedEx = ex;
        }
        Assert.assertTrue((String)("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?")), (expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred") ? 1 : 0) != 0);
        r = this.msClient.showTxns();
        Assert.assertEquals((String)"HWM didn'table match", (long)21L, (long)r.getTxn_high_water_mark());
        ti = r.getOpen_txns();
        Assert.assertEquals((String)"wrong status ti(3)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(3)).getState());
        Assert.assertEquals((String)"wrong status ti(4)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(4)).getState());
    }

    private HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets(String dbLocation, String tableName) throws IOException {
        HashMap<Integer, ArrayList<SampleRec>> result = new HashMap<Integer, ArrayList<SampleRec>>();
        for (File deltaDir : new File(dbLocation + "/" + tableName).listFiles()) {
            File[] bucketFiles;
            if (!deltaDir.getName().startsWith("delta")) continue;
            for (File bucketFile : bucketFiles = deltaDir.listFiles(new FileFilter(){

                @Override
                public boolean accept(File pathname) {
                    String name = pathname.getName();
                    return !name.startsWith("_") && !name.startsWith(".");
                }
            })) {
                if (bucketFile.toString().endsWith("length")) continue;
                Integer bucketNum = this.getBucketNumber(bucketFile);
                ArrayList<SampleRec> recs = this.dumpBucket(new Path(bucketFile.toString()));
                result.put(bucketNum, recs);
            }
        }
        return result;
    }

    private Integer getBucketNumber(File bucketFile) {
        String fname = bucketFile.getName();
        int start = fname.indexOf(95);
        String number = fname.substring(start + 1, fname.length());
        return Integer.parseInt(number);
    }

    public static void dropDB(IMetaStoreClient client, String databaseName) {
        try {
            for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
                client.dropTable(databaseName, table, true, true);
            }
            client.dropDatabase(databaseName);
        }
        catch (TException tException) {
            // empty catch block
        }
    }

    private static Path createDbAndTable(IDriver driver, String databaseName, String tableName, List<String> partVals, String[] colNames, String[] colTypes, String[] bucketCols, String[] partNames, String dbLocation, int bucketCount) throws Exception {
        String dbUri = "raw://" + new Path(dbLocation).toUri().toString();
        String tableLoc = dbUri + "/" + tableName;
        TestStreaming.runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri + "'");
        TestStreaming.runDDL(driver, "use " + databaseName);
        String crtTbl = "create table " + tableName + " ( " + TestStreaming.getTableColumnsStr(colNames, colTypes) + " )" + TestStreaming.getPartitionStmtStr(partNames) + " clustered by ( " + TestStreaming.join(bucketCols, ",") + " ) into " + bucketCount + " buckets  stored as orc  location '" + tableLoc + "' TBLPROPERTIES ('transactional'='true') ";
        TestStreaming.runDDL(driver, crtTbl);
        if (partNames != null && partNames.length != 0) {
            return TestStreaming.addPartition(driver, tableName, partVals, partNames);
        }
        return new Path(tableLoc);
    }

    private static Path addPartition(IDriver driver, String tableName, List<String> partVals, String[] partNames) throws Exception {
        String partSpec = TestStreaming.getPartsSpec(partNames, partVals);
        String addPart = "alter table " + tableName + " add partition ( " + partSpec + " )";
        TestStreaming.runDDL(driver, addPart);
        return TestStreaming.getPartitionPath(driver, tableName, partSpec);
    }

    private static Path getPartitionPath(IDriver driver, String tableName, String partSpec) throws Exception {
        ArrayList<String> res = TestStreaming.queryTable(driver, "describe extended " + tableName + " PARTITION (" + partSpec + ")");
        String partInfo = res.get(res.size() - 1);
        int start = partInfo.indexOf("location:") + "location:".length();
        int end = partInfo.indexOf(",", start);
        return new Path(partInfo.substring(start, end));
    }

    private static String getTableColumnsStr(String[] colNames, String[] colTypes) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < colNames.length; ++i) {
            sb.append(colNames[i]).append(" ").append(colTypes[i]);
            if (i >= colNames.length - 1) continue;
            sb.append(",");
        }
        return sb.toString();
    }

    private static String getTablePartsStr(String[] partNames) {
        if (partNames == null || partNames.length == 0) {
            return "";
        }
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < partNames.length; ++i) {
            sb.append(partNames[i]).append(" string");
            if (i >= partNames.length - 1) continue;
            sb.append(",");
        }
        return sb.toString();
    }

    private static String getPartsSpec(String[] partNames, List<String> partVals) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < partVals.size(); ++i) {
            sb.append(partNames[i]).append(" = '").append(partVals.get(i)).append("'");
            if (i >= partVals.size() - 1) continue;
            sb.append(",");
        }
        return sb.toString();
    }

    private static String join(String[] values, String delimiter) {
        if (values == null) {
            return null;
        }
        StringBuilder strbuf = new StringBuilder();
        boolean first = true;
        for (String value : values) {
            if (!first) {
                strbuf.append(delimiter);
            } else {
                first = false;
            }
            strbuf.append(value.toString());
        }
        return strbuf.toString();
    }

    private static String getPartitionStmtStr(String[] partNames) {
        if (partNames == null || partNames.length == 0) {
            return "";
        }
        return " partitioned by (" + TestStreaming.getTablePartsStr(partNames) + " )";
    }

    private static boolean runDDL(IDriver driver, String sql) {
        LOG.debug(sql);
        System.out.println(sql);
        try {
            driver.run(sql);
            return true;
        }
        catch (CommandProcessorException e) {
            LOG.error("Statement: " + sql + " failed: " + e);
            return false;
        }
    }

    private static ArrayList<String> queryTable(IDriver driver, String query) throws IOException {
        try {
            driver.run(query);
        }
        catch (CommandProcessorException e) {
            throw new RuntimeException(query + " failed: " + e);
        }
        ArrayList<String> res = new ArrayList<String>();
        driver.getResults(res);
        return res;
    }

    static {
        fieldNames2 = new String[]{COL1, COL2};
    }

    public static class RawFileSystem
    extends RawLocalFileSystem {
        private static final URI NAME;

        public URI getUri() {
            return NAME;
        }

        public String getScheme() {
            return "raw";
        }

        public FileStatus getFileStatus(Path path) throws IOException {
            File file = this.pathToFile(path);
            if (!file.exists()) {
                throw new FileNotFoundException("Can'table find " + path);
            }
            short mod = 0;
            if (file.canRead()) {
                mod = (short)(mod | 0x124);
            }
            if (file.canWrite()) {
                mod = (short)(mod | 0x80);
            }
            if (file.canExecute()) {
                mod = (short)(mod | 0x49);
            }
            return new FileStatus(file.length(), file.isDirectory(), 1, 1024L, file.lastModified(), file.lastModified(), FsPermission.createImmutable((short)mod), "owen", "users", path);
        }

        static {
            try {
                NAME = new URI("raw:///");
            }
            catch (URISyntaxException se) {
                throw new IllegalArgumentException("bad uri", se);
            }
        }
    }

    private static class WriterThd
    extends Thread {
        private final StreamingConnection conn;
        private final String data;
        private Throwable error;

        WriterThd(String data) throws Exception {
            super("Writer_" + data);
            StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
            HiveStreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(TestStreaming.dbName).withTable("alerts").withStaticPartitionValues(partitionVals).withRecordWriter((RecordWriter)writer).withHiveConf(conf).connect();
            this.conn = connection;
            this.data = data;
            this.setUncaughtExceptionHandler((thread, throwable) -> {
                this.error = throwable;
                LOG.error(connection.toTransactionString());
                LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable);
            });
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; ++i) {
                    this.conn.beginTransaction();
                    this.conn.write(this.data.getBytes());
                    this.conn.write(this.data.getBytes());
                    this.conn.commitTransaction();
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            finally {
                if (this.conn != null) {
                    try {
                        this.conn.close();
                    }
                    catch (Exception e) {
                        LOG.error("txnBatch.close() failed: " + e.getMessage(), (Throwable)e);
                    }
                }
            }
        }
    }

    private static class SampleRec {
        public String field1;
        public int field2;
        public String field3;

        public SampleRec(String field1, int field2, String field3) {
            this.field1 = field1;
            this.field2 = field2;
            this.field3 = field3;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SampleRec that = (SampleRec)o;
            if (this.field2 != that.field2) {
                return false;
            }
            if (this.field1 != null ? !this.field1.equals(that.field1) : that.field1 != null) {
                return false;
            }
            return !(this.field3 == null ? that.field3 != null : !this.field3.equals(that.field3));
        }

        public int hashCode() {
            int result = this.field1 != null ? this.field1.hashCode() : 0;
            result = 31 * result + this.field2;
            result = 31 * result + (this.field3 != null ? this.field3.hashCode() : 0);
            return result;
        }

        public String toString() {
            return " { '" + this.field1 + "'," + this.field2 + ",'" + this.field3 + "' }";
        }
    }

    private static final class FaultyWriter
    implements RecordWriter {
        private final RecordWriter delegate;
        private boolean shouldThrow = false;

        private FaultyWriter(RecordWriter delegate) {
            assert (delegate != null);
            this.delegate = delegate;
        }

        public void init(StreamingConnection connection, long minWriteId, long maxWriteID) throws StreamingException {
            this.delegate.init(connection, minWriteId, maxWriteID);
        }

        public void write(long writeId, byte[] record) throws StreamingException {
            this.delegate.write(writeId, record);
            this.produceFault();
        }

        public void write(long writeId, InputStream inputStream) throws StreamingException {
            this.delegate.write(writeId, inputStream);
            this.produceFault();
        }

        public void flush() throws StreamingException {
            this.delegate.flush();
            this.produceFault();
        }

        public void close() throws StreamingException {
            this.delegate.close();
        }

        public Set<String> getPartitions() {
            return this.delegate.getPartitions();
        }

        private void produceFault() throws StreamingIOFailure {
            if (this.shouldThrow) {
                throw new StreamingIOFailure("Simulated fault occurred");
            }
        }

        void enableErrors() {
            this.shouldThrow = true;
        }

        void disableErrors() {
            this.shouldThrow = false;
        }

        public Path getDeltaFileLocation(List<String> partitionValues, Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId, Table table) throws StreamingException {
            return null;
        }
    }
}

