/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.hcatalog.streaming;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.Validator;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.ConnectionError;
import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.InvalidTable;
import org.apache.hive.hcatalog.streaming.QueryFailedException;
import org.apache.hive.hcatalog.streaming.RecordWriter;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.hive.hcatalog.streaming.StreamingIOFailure;
import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
import org.apache.hive.hcatalog.streaming.StrictRegexWriter;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.hive.hcatalog.streaming.TransactionError;
import org.apache.orc.impl.OrcAcidUtils;
import org.apache.orc.tools.FileDump;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestStreaming {
    private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
    private static final String COL1 = "id";
    private static final String COL2 = "msg";
    private final HiveConf conf;
    private IDriver driver;
    private final IMetaStoreClient msClient;
    final String metaStoreURI;
    private static final String dbName = "testing";
    private static final String tblName = "alerts";
    private static final String[] fieldNames = new String[]{"id", "msg"};
    List<String> partitionVals;
    private static Path partLoc;
    private static Path partLoc2;
    private static final String dbName2 = "testing2";
    private static final String tblName2 = "alerts";
    private static final String[] fieldNames2;
    private static final String dbName3 = "testing3";
    private static final String tblName3 = "dimensionTable";
    private static final String dbName4 = "testing4";
    private static final String tblName4 = "factTable";
    List<String> partitionVals2;
    private final String PART1_CONTINENT = "Asia";
    private final String PART1_COUNTRY = "India";
    @Rule
    public TemporaryFolder dbFolder = new TemporaryFolder();

    public TestStreaming() throws Exception {
        this.metaStoreURI = null;
        this.partitionVals = new ArrayList<String>(2);
        this.partitionVals.add("Asia");
        this.partitionVals.add("India");
        this.partitionVals2 = new ArrayList<String>(1);
        this.partitionVals2.add("India");
        this.conf = new HiveConf(this.getClass());
        this.conf.set("fs.raw.impl", RawFileSystem.class.getName());
        this.conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
        TxnDbUtil.setConfValues((Configuration)this.conf);
        if (this.metaStoreURI != null) {
            this.conf.setVar(HiveConf.ConfVars.METASTOREURIS, this.metaStoreURI);
        }
        this.conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
        this.conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
        this.dbFolder.create();
        TxnDbUtil.cleanDb((Configuration)this.conf);
        TxnDbUtil.prepDb((Configuration)this.conf);
        this.msClient = new HiveMetaStoreClient((Configuration)this.conf);
    }

    @Before
    public void setup() throws Exception {
        SessionState.start((SessionState)new CliSessionState(this.conf));
        this.driver = DriverFactory.newDriver((HiveConf)this.conf);
        this.driver.setMaxRows(200002);
        TestStreaming.dropDB(this.msClient, dbName);
        String[] colNames = new String[]{COL1, COL2};
        String[] colTypes = new String[]{"int", "string"};
        String[] bucketCols = new String[]{COL1};
        String loc1 = this.dbFolder.newFolder("testing.db").toString();
        String[] partNames = new String[]{"Continent", "Country"};
        partLoc = TestStreaming.createDbAndTable(this.driver, dbName, "alerts", this.partitionVals, colNames, colTypes, bucketCols, partNames, loc1, 1);
        TestStreaming.dropDB(this.msClient, dbName2);
        String loc2 = this.dbFolder.newFolder("testing2.db").toString();
        partLoc2 = TestStreaming.createDbAndTable(this.driver, dbName2, "alerts", null, colNames, colTypes, bucketCols, null, loc2, 2);
        String loc3 = this.dbFolder.newFolder("testing5.db").toString();
        this.createStoreSales("testing5", loc3);
        TestStreaming.runDDL(this.driver, "drop table testBucketing3.streamedtable");
        TestStreaming.runDDL(this.driver, "drop table testBucketing3.finaltable");
        TestStreaming.runDDL(this.driver, "drop table testBucketing3.nobucket");
    }

    @After
    public void cleanup() throws Exception {
        this.msClient.close();
        this.driver.close();
    }

    private static List<FieldSchema> getPartitionKeys() {
        ArrayList<FieldSchema> fields = new ArrayList<FieldSchema>();
        fields.add(new FieldSchema("continent", "string", ""));
        fields.add(new FieldSchema("country", "string", ""));
        return fields;
    }

    private void createStoreSales(String dbName, String loc) throws Exception {
        String dbUri = "raw://" + new Path(loc).toUri().toString();
        String tableLoc = dbUri + "/" + "store_sales";
        boolean success = TestStreaming.runDDL(this.driver, "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'");
        Assert.assertTrue((boolean)success);
        success = TestStreaming.runDDL(this.driver, "use " + dbName);
        Assert.assertTrue((boolean)success);
        success = TestStreaming.runDDL(this.driver, "drop table if exists store_sales");
        Assert.assertTrue((boolean)success);
        success = TestStreaming.runDDL(this.driver, "create table store_sales\n(\n    ss_sold_date_sk           int,\n    ss_sold_time_sk           int,\n    ss_item_sk                int,\n    ss_customer_sk            int,\n    ss_cdemo_sk               int,\n    ss_hdemo_sk               int,\n    ss_addr_sk                int,\n    ss_store_sk               int,\n    ss_promo_sk               int,\n    ss_ticket_number          int,\n    ss_quantity               int,\n    ss_wholesale_cost         decimal(7,2),\n    ss_list_price             decimal(7,2),\n    ss_sales_price            decimal(7,2),\n    ss_ext_discount_amt       decimal(7,2),\n    ss_ext_sales_price        decimal(7,2),\n    ss_ext_wholesale_cost     decimal(7,2),\n    ss_ext_list_price         decimal(7,2),\n    ss_ext_tax                decimal(7,2),\n    ss_coupon_amt             decimal(7,2),\n    ss_net_paid               decimal(7,2),\n    ss_net_paid_inc_tax       decimal(7,2),\n    ss_net_profit             decimal(7,2)\n)\n partitioned by (dt string)\nclustered by (ss_store_sk, ss_promo_sk)\nINTO 4 BUCKETS stored as orc  location '" + tableLoc + "'  TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
        Assert.assertTrue((boolean)success);
        success = TestStreaming.runDDL(this.driver, "alter table store_sales add partition(dt='2015')");
        Assert.assertTrue((boolean)success);
    }

    @Test
    public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
        ArrayList<String> partitionVals = new ArrayList<String>();
        partitionVals.add("2015");
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testing5", "store_sales", partitionVals);
        StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
        DelimitedInputWriter writer = new DelimitedInputWriter(new String[]{"ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"}, ",", endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(2, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        StringBuilder row = new StringBuilder();
        for (int i = 0; i < 10; ++i) {
            for (int ints = 0; ints < 11; ++ints) {
                row.append(ints).append(',');
            }
            for (int decs = 0; decs < 12; ++decs) {
                row.append((double)i + 0.1).append(',');
            }
            row.setLength(row.length() - 1);
            txnBatch.write(row.toString().getBytes());
        }
        txnBatch.commit();
        txnBatch.close();
        connection.close();
        ArrayList<String> res = TestStreaming.queryTable(this.driver, "select row__id.bucketid, * from testing5.store_sales");
        for (String re : res) {
            System.out.println(re);
        }
    }

    @Test
    public void testNoBuckets() throws Exception {
        TestStreaming.queryTable(this.driver, "drop table if exists default.streamingnobuckets");
        TestStreaming.queryTable(this.driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')");
        TestStreaming.queryTable(this.driver, "insert into default.streamingnobuckets values('foo','bar')");
        ArrayList<String> rs = TestStreaming.queryTable(this.driver, "select * from default.streamingNoBuckets");
        Assert.assertEquals((long)1L, (long)rs.size());
        Assert.assertEquals((Object)"foo\tbar", rs.get(0));
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "Default", "StreamingNoBuckets", null);
        String[] colNames1 = new String[]{"a", "b"};
        StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
        DelimitedInputWriter wr = new DelimitedInputWriter(colNames1, ",", endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(2, (RecordWriter)wr);
        txnBatch.beginNextTransaction();
        txnBatch.write("a1,b2".getBytes());
        txnBatch.write("a3,b4".getBytes());
        TxnStore txnHandler = TxnUtils.getTxnStore((Configuration)this.conf);
        ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest());
        Assert.assertEquals((long)resp.getLocksSize(), (long)1L);
        Assert.assertEquals((Object)"streamingnobuckets", (Object)((ShowLocksResponseElement)resp.getLocks().get(0)).getTablename());
        Assert.assertEquals((Object)"default", (Object)((ShowLocksResponseElement)resp.getLocks().get(0)).getDbname());
        txnBatch.commit();
        txnBatch.beginNextTransaction();
        txnBatch.write("a5,b6".getBytes());
        txnBatch.write("a7,b8".getBytes());
        txnBatch.commit();
        txnBatch.close();
        Assert.assertEquals((String)"", (long)0L, (long)BucketCodec.determineVersion((int)0x20000000).decodeWriterId(0x20000000));
        rs = TestStreaming.queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(4)), (boolean)((String)rs.get(4)).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
        Assert.assertTrue((String)((String)rs.get(4)), (boolean)((String)rs.get(4)).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
        TestStreaming.queryTable(this.driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
        TestStreaming.queryTable(this.driver, "delete from default.streamingnobuckets where a='a1'");
        rs = TestStreaming.queryTable(this.driver, "select a, b from default.streamingnobuckets order by a, b");
        int row = 0;
        Assert.assertEquals((String)("at row=" + row), (Object)"0\t0", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"a3\tb4", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"a5\tb6", rs.get(row++));
        Assert.assertEquals((String)("at row=" + row), (Object)"foo\tbar", rs.get(row++));
        TestStreaming.queryTable(this.driver, "alter table default.streamingnobuckets compact 'major'");
        TestStreaming.runWorker(this.conf);
        rs = TestStreaming.queryTable(this.driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
        Assert.assertTrue((String)((String)rs.get(0)), (boolean)((String)rs.get(0)).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
        Assert.assertTrue((String)((String)rs.get(1)), (boolean)((String)rs.get(1)).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
        Assert.assertTrue((String)((String)rs.get(2)), (boolean)((String)rs.get(2)).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
        Assert.assertTrue((String)((String)rs.get(3)), (boolean)((String)rs.get(3)).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
    }

    public static void runWorker(HiveConf hiveConf) throws MetaException {
        AtomicBoolean stop = new AtomicBoolean(true);
        Worker t = new Worker();
        t.setThreadId((int)t.getId());
        t.setConf((Configuration)hiveConf);
        AtomicBoolean looped = new AtomicBoolean();
        t.init(stop, looped);
        t.run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStreamBucketingMatchesRegularBucketing() throws Exception {
        int bucketCount = 100;
        String dbUri = "raw://" + new Path(this.dbFolder.newFolder().toString()).toUri().toString();
        String tableLoc = "'" + dbUri + "/" + "streamedtable'";
        String tableLoc2 = "'" + dbUri + "/" + "finaltable'";
        String tableLoc3 = "'" + dbUri + "/" + "nobucket'";
        try (IDriver driver = DriverFactory.newDriver((HiveConf)this.conf);){
            TestStreaming.runDDL(driver, "create database testBucketing3");
            TestStreaming.runDDL(driver, "use testBucketing3");
            TestStreaming.runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into " + bucketCount + " buckets  stored as orc  location " + tableLoc + " TBLPROPERTIES ('transactional'='true')");
            TestStreaming.runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + tableLoc3);
            TestStreaming.runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into " + bucketCount + " buckets  stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')");
            String[] records = new String[]{"PSFAHYLZVC,29,EPNMA", "PPPRKWAYAU,96,VUTEE", "MIAOFERCHI,3,WBDSI", "CEGQAZOWVN,0,WCUZL", "XWAKMNSVQF,28,YJVHU", "XBWTSAJWME,2,KDQFO", "FUVLQTAXAY,5,LDSDG", "QTQMDJMGJH,6,QBOMA", "EFLOTLWJWN,71,GHWPS", "PEQNAOJHCM,82,CAAFI", "MOEKQLGZCP,41,RUACR", "QZXMCOPTID,37,LFLWE", "EYALVWICRD,13,JEZLC", "VYWLZAYTXX,16,DMVZX", "OSALYSQIXR,47,HNZVE", "JGKVHKCEGQ,25,KSCJB", "WQFMMYDHET,12,DTRWA", "AJOVAYZKZQ,15,YBKFO", "YAQONWCUAU,31,QJNHZ", "DJBXUEUOEB,35,IYCBL"};
            HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testBucketing3", "streamedtable", null);
            String[] colNames1 = new String[]{"key1", "key2", "data"};
            DelimitedInputWriter wr = new DelimitedInputWriter(colNames1, ",", endPt);
            StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
            TransactionBatch txnBatch = connection.fetchTransactionBatch(2, (RecordWriter)wr);
            txnBatch.beginNextTransaction();
            for (String record : records) {
                txnBatch.write(record.toString().getBytes());
            }
            txnBatch.commit();
            txnBatch.close();
            connection.close();
            ArrayList<String> res1 = TestStreaming.queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2");
            for (String re : res1) {
                System.out.println(re);
            }
            driver.run("insert into nobucket select row__id.bucketid,* from streamedtable");
            TestStreaming.runDDL(driver, " insert into finaltable select * from nobucket");
            ArrayList<String> res2 = TestStreaming.queryTable(driver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid");
            for (String s : res2) {
                LOG.error(s);
            }
            Assert.assertTrue((boolean)res2.isEmpty());
        }
        finally {
            this.conf.unset(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname);
        }
    }

    @Test
    public void testTableValidation() throws Exception {
        HiveEndPoint endPt2;
        int bucketCount = 100;
        String dbUri = "raw://" + new Path(this.dbFolder.newFolder().toString()).toUri().toString();
        String tbl1 = "validation1";
        String tbl2 = "validation2";
        String tableLoc = "'" + dbUri + "/" + tbl1 + "'";
        String tableLoc2 = "'" + dbUri + "/" + tbl2 + "'";
        TestStreaming.runDDL(this.driver, "create database testBucketing3");
        TestStreaming.runDDL(this.driver, "use testBucketing3");
        TestStreaming.runDDL(this.driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into " + bucketCount + " buckets  stored as orc  location " + tableLoc + " TBLPROPERTIES ('transactional'='false')");
        TestStreaming.runDDL(this.driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into " + bucketCount + " buckets  stored as orc  location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')");
        try {
            endPt2 = new HiveEndPoint(this.metaStoreURI, "testBucketing3", "validation1", null);
            endPt2.newConnection(false, "UT_" + Thread.currentThread().getName());
            Assert.assertTrue((String)"InvalidTable exception was not thrown", (boolean)false);
        }
        catch (InvalidTable endPt2) {
            // empty catch block
        }
        try {
            endPt2 = new HiveEndPoint(this.metaStoreURI, "testBucketing3", "validation2", null);
            endPt2.newConnection(false, "UT_" + Thread.currentThread().getName());
            Assert.assertTrue((String)"InvalidTable exception was not thrown", (boolean)false);
        }
        catch (InvalidTable invalidTable) {
            // empty catch block
        }
    }

    @Deprecated
    private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String ... records) throws Exception {
        ValidWriteIdList writeIds = this.msClient.getValidWriteIds(AcidUtils.getFullTableName((String)dbName, (String)"alerts"));
        AcidUtils.Directory dir = AcidUtils.getAcidState((Path)partitionPath, (Configuration)this.conf, (ValidWriteIdList)writeIds);
        Assert.assertEquals((long)0L, (long)dir.getObsolete().size());
        Assert.assertEquals((long)0L, (long)dir.getOriginalFiles().size());
        List current = dir.getCurrentDirectories();
        System.out.println("Files found: ");
        for (AcidUtils.ParsedDelta pd : current) {
            System.out.println(pd.getPath().toString());
        }
        Assert.assertEquals((long)numExpectedFiles, (long)current.size());
        long min = Long.MAX_VALUE;
        long max = Long.MIN_VALUE;
        for (AcidUtils.ParsedDelta pd : current) {
            if (pd.getMaxWriteId() > max) {
                max = pd.getMaxWriteId();
            }
            if (pd.getMinWriteId() >= min) continue;
            min = pd.getMinWriteId();
        }
        Assert.assertEquals((long)minTxn, (long)min);
        Assert.assertEquals((long)maxTxn, (long)max);
        OrcInputFormat inf = new OrcInputFormat();
        JobConf job = new JobConf();
        job.set("mapred.input.dir", partitionPath.toString());
        job.set("bucket_count", Integer.toString(buckets));
        job.set("schema.evolution.columns", "id,msg");
        job.set("schema.evolution.columns.types", "bigint:string");
        AcidUtils.setAcidOperationalProperties((Configuration)job, (boolean)true, null);
        job.setBoolean("transactional", true);
        job.set("hive.txn.valid.writeids", writeIds.toString());
        InputSplit[] splits = inf.getSplits(job, buckets);
        Assert.assertEquals((long)numExpectedFiles, (long)splits.length);
        org.apache.hadoop.mapred.RecordReader rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
        NullWritable key = (NullWritable)rr.createKey();
        OrcStruct value = (OrcStruct)rr.createValue();
        for (String record : records) {
            Assert.assertEquals((Object)true, (Object)rr.next((Object)key, (Object)value));
            Assert.assertEquals((Object)record, (Object)value.toString());
        }
        Assert.assertEquals((Object)false, (Object)rr.next((Object)key, (Object)value));
    }

    private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles, String validationQuery, boolean vectorize, String ... records) throws Exception {
        ValidWriteIdList txns = this.msClient.getValidWriteIds(AcidUtils.getFullTableName((String)dbName, (String)"alerts"));
        AcidUtils.Directory dir = AcidUtils.getAcidState((Path)partitionPath, (Configuration)this.conf, (ValidWriteIdList)txns);
        Assert.assertEquals((long)0L, (long)dir.getObsolete().size());
        Assert.assertEquals((long)0L, (long)dir.getOriginalFiles().size());
        List current = dir.getCurrentDirectories();
        System.out.println("Files found: ");
        for (AcidUtils.ParsedDelta pd : current) {
            System.out.println(pd.getPath().toString());
        }
        Assert.assertEquals((long)numExpectedFiles, (long)current.size());
        long min = Long.MAX_VALUE;
        long max = Long.MIN_VALUE;
        for (AcidUtils.ParsedDelta pd : current) {
            if (pd.getMaxWriteId() > max) {
                max = pd.getMaxWriteId();
            }
            if (pd.getMinWriteId() >= min) continue;
            min = pd.getMinWriteId();
        }
        Assert.assertEquals((long)minTxn, (long)min);
        Assert.assertEquals((long)maxTxn, (long)max);
        boolean isVectorizationEnabled = this.conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
        if (vectorize) {
            this.conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
        }
        String currStrategy = this.conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
        for (String strategy : ((Validator.StringSet)HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()).getExpected()) {
            this.conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, strategy.toUpperCase());
            ArrayList<String> actualResult = TestStreaming.queryTable(this.driver, validationQuery);
            for (int i = 0; i < actualResult.size(); ++i) {
                Assert.assertEquals((String)("diff at [" + i + "].  actual=" + actualResult + " expected=" + Arrays.toString(records)), (Object)records[i], actualResult.get(i));
            }
        }
        this.conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
        this.conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled);
    }

    private void checkNothingWritten(Path partitionPath) throws Exception {
        ValidWriteIdList writeIds = this.msClient.getValidWriteIds(AcidUtils.getFullTableName((String)dbName, (String)"alerts"));
        AcidUtils.Directory dir = AcidUtils.getAcidState((Path)partitionPath, (Configuration)this.conf, (ValidWriteIdList)writeIds);
        Assert.assertEquals((long)0L, (long)dir.getObsolete().size());
        Assert.assertEquals((long)0L, (long)dir.getOriginalFiles().size());
        List current = dir.getCurrentDirectories();
        Assert.assertEquals((long)0L, (long)current.size());
    }

    @Test
    public void testEndpointConnection() throws Exception {
        String errMsg;
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
        connection.close();
        endPt = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", null);
        endPt.newConnection(false, "UT_" + Thread.currentThread().getName()).close();
        try {
            endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", null);
            connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
            Assert.assertTrue((String)"ConnectionError was not thrown", (boolean)false);
            connection.close();
        }
        catch (ConnectionError e) {
            errMsg = "doesn't specify any partitions for partitioned table";
            Assert.assertTrue((boolean)e.toString().endsWith(errMsg));
        }
        try {
            endPt = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", this.partitionVals);
            connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
            Assert.assertTrue((String)"ConnectionError was not thrown", (boolean)false);
            connection.close();
        }
        catch (ConnectionError e) {
            errMsg = "specifies partitions for unpartitioned table";
            Assert.assertTrue((boolean)e.toString().endsWith(errMsg));
        }
    }

    @Test
    public void testAddPartition() throws Exception {
        ArrayList<String> newPartVals = new ArrayList<String>(2);
        newPartVals.add("Asia");
        newPartVals.add("Nepal");
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", newPartVals);
        try {
            this.msClient.getPartition(endPt.database, endPt.table, (List)endPt.partitionVals);
            Assert.assertTrue((String)"Partition already exists", (boolean)false);
        }
        catch (NoSuchObjectException noSuchObjectException) {
            // empty catch block
        }
        Assert.assertNotNull((Object)endPt.newConnection(true, "UT_" + Thread.currentThread().getName()));
        Partition p = this.msClient.getPartition(endPt.database, endPt.table, (List)endPt.partitionVals);
        Assert.assertNotNull((String)"Did not find added partition", (Object)p);
    }

    @Test
    public void testTransactionBatchEmptyCommit() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.commit();
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
        endPt = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", null);
        writer = new DelimitedInputWriter(fieldNames2, ",", endPt);
        connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.commit();
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
    }

    @Test
    public void testTimeOutReaper() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", null);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2, ",", endPt);
        StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch txnBatch = connection.fetchTransactionBatch(5, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        this.conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0L, TimeUnit.SECONDS);
        this.conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1L, TimeUnit.MILLISECONDS);
        AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService();
        houseKeeperService.setConf((Configuration)this.conf);
        houseKeeperService.run();
        try {
            txnBatch.commit();
        }
        catch (TransactionError e) {
            Assert.assertTrue((String)"Expected aborted transaction", (boolean)(e.getCause() instanceof TxnAbortedException));
        }
        txnBatch.close();
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.commit();
        txnBatch.beginNextTransaction();
        houseKeeperService.run();
        try {
            txnBatch.commit();
        }
        catch (TransactionError e) {
            Assert.assertTrue((String)"Expected aborted transaction", (boolean)(e.getCause() instanceof TxnAbortedException));
        }
        txnBatch.close();
        connection.close();
    }

    @Test
    public void testHeartbeat() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", null);
        StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2, ",", endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(5, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        ShowLocksRequest request = new ShowLocksRequest();
        request.setDbname(dbName2);
        request.setTablename("alerts");
        ShowLocksResponse response = this.msClient.showLocks(request);
        Assert.assertEquals((String)("Wrong nubmer of locks: " + response), (long)1L, (long)response.getLocks().size());
        ShowLocksResponseElement lock = (ShowLocksResponseElement)response.getLocks().get(0);
        long acquiredAt = lock.getAcquiredat();
        long heartbeatAt = lock.getLastheartbeat();
        txnBatch.heartbeat();
        response = this.msClient.showLocks(request);
        Assert.assertEquals((String)("Wrong number of locks2: " + response), (long)1L, (long)response.getLocks().size());
        lock = (ShowLocksResponseElement)response.getLocks().get(0);
        Assert.assertEquals((String)"Acquired timestamp didn't match", (long)acquiredAt, (long)lock.getAcquiredat());
        Assert.assertTrue((String)("Expected new heartbeat (" + lock.getLastheartbeat() + ") == old heartbeat(" + heartbeatAt + ")"), (lock.getLastheartbeat() == heartbeatAt ? 1 : 0) != 0);
        txnBatch.close();
        int txnBatchSize = 200;
        txnBatch = connection.fetchTransactionBatch(txnBatchSize, (RecordWriter)writer);
        for (int i = 0; i < txnBatchSize; ++i) {
            txnBatch.beginNextTransaction();
            if (i % 47 == 0) {
                txnBatch.heartbeat();
            }
            if (i % 10 == 0) {
                txnBatch.abort();
            } else {
                txnBatch.commit();
            }
            if (i % 37 != 0) continue;
            txnBatch.heartbeat();
        }
    }

    @Test
    public void testTransactionBatchEmptyAbort() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.abort();
        Assert.assertEquals((Object)TransactionBatch.TxnState.ABORTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
        endPt = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", null);
        writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.abort();
        Assert.assertEquals((Object)TransactionBatch.TxnState.ABORTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
    }

    @Test
    public void testTransactionBatchCommit_Delimited() throws Exception {
        this.testTransactionBatchCommit_Delimited(null);
    }

    @Test
    public void testTransactionBatchCommit_DelimitedUGI() throws Exception {
        this.testTransactionBatchCommit_Delimited(Utils.getUGI());
    }

    private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection connection = endPt.newConnection(true, this.conf, ugi, "UT_" + Thread.currentThread().getName());
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt, this.conf, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.commit();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.beginNextTransaction();
        Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.write("2,Welcome to streaming".getBytes());
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        txnBatch.commit();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        txnBatch.close();
        Assert.assertEquals((Object)TransactionBatch.TxnState.INACTIVE, (Object)txnBatch.getCurrentTransactionState());
        connection.close();
        endPt = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", null);
        connection = endPt.newConnection(true, this.conf, ugi, "UT_" + Thread.currentThread().getName());
        writer = new DelimitedInputWriter(fieldNames, ",", endPt, this.conf, connection);
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.commit();
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        connection.close();
    }

    @Test
    public void testTransactionBatchCommit_Regex() throws Exception {
        this.testTransactionBatchCommit_Regex(null);
    }

    @Test
    public void testTransactionBatchCommit_RegexUGI() throws Exception {
        this.testTransactionBatchCommit_Regex(Utils.getUGI());
    }

    private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection connection = endPt.newConnection(true, this.conf, ugi, "UT_" + Thread.currentThread().getName());
        String regex = "([^,]*),(.*)";
        StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, this.conf, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.commit();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.beginNextTransaction();
        Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.write("2,Welcome to streaming".getBytes());
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        txnBatch.commit();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        txnBatch.close();
        Assert.assertEquals((Object)TransactionBatch.TxnState.INACTIVE, (Object)txnBatch.getCurrentTransactionState());
        connection.close();
        endPt = new HiveEndPoint(this.metaStoreURI, dbName2, "alerts", null);
        connection = endPt.newConnection(true, this.conf, ugi, "UT_" + Thread.currentThread().getName());
        regex = "([^:]*):(.*)";
        writer = new StrictRegexWriter(regex, endPt, this.conf, connection);
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.write("1:Hello streaming".getBytes());
        txnBatch.commit();
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        connection.close();
    }

    @Test
    public void testTransactionBatchCommit_Json() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
        StrictJsonWriter writer = new StrictJsonWriter(endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
        String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
        txnBatch.write(rec1.getBytes());
        txnBatch.commit();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}");
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        Assert.assertEquals((Object)TransactionBatch.TxnState.INACTIVE, (Object)txnBatch.getCurrentTransactionState());
        connection.close();
        ArrayList<String> rs = TestStreaming.queryTable(this.driver, "select * from testing.alerts");
        Assert.assertEquals((long)1L, (long)rs.size());
    }

    @Test
    public void testRemainingTransactions() throws Exception {
        int rec;
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        int batch = 0;
        int initialCount = txnBatch.remainingTransactions();
        while (txnBatch.remainingTransactions() > 0) {
            txnBatch.beginNextTransaction();
            Assert.assertEquals((long)(--initialCount), (long)txnBatch.remainingTransactions());
            for (rec = 0; rec < 2; ++rec) {
                Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
                txnBatch.write((batch * rec + ",Hello streaming").getBytes());
            }
            txnBatch.commit();
            Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
            ++batch;
        }
        Assert.assertEquals((long)0L, (long)txnBatch.remainingTransactions());
        txnBatch.close();
        Assert.assertEquals((Object)TransactionBatch.TxnState.INACTIVE, (Object)txnBatch.getCurrentTransactionState());
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        batch = 0;
        initialCount = txnBatch.remainingTransactions();
        while (txnBatch.remainingTransactions() > 0) {
            txnBatch.beginNextTransaction();
            Assert.assertEquals((long)(--initialCount), (long)txnBatch.remainingTransactions());
            for (rec = 0; rec < 2; ++rec) {
                Assert.assertEquals((Object)TransactionBatch.TxnState.OPEN, (Object)txnBatch.getCurrentTransactionState());
                txnBatch.write((batch * rec + ",Hello streaming").getBytes());
            }
            txnBatch.abort();
            Assert.assertEquals((Object)TransactionBatch.TxnState.ABORTED, (Object)txnBatch.getCurrentTransactionState());
            ++batch;
        }
        Assert.assertEquals((long)0L, (long)txnBatch.remainingTransactions());
        txnBatch.close();
        Assert.assertEquals((Object)TransactionBatch.TxnState.INACTIVE, (Object)txnBatch.getCurrentTransactionState());
        connection.close();
    }

    @Test
    public void testTransactionBatchAbort() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.write("2,Welcome to streaming".getBytes());
        txnBatch.abort();
        this.checkNothingWritten(partLoc);
        Assert.assertEquals((Object)TransactionBatch.TxnState.ABORTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
        this.checkNothingWritten(partLoc);
    }

    @Test
    public void testTransactionBatchAbortAndCommit() throws Exception {
        String agentInfo = "UT_" + Thread.currentThread().getName();
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection connection = endPt.newConnection(false, agentInfo);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.write("2,Welcome to streaming".getBytes());
        ShowLocksResponse resp = this.msClient.showLocks(new ShowLocksRequest());
        Assert.assertEquals((String)"LockCount", (long)1L, (long)resp.getLocksSize());
        Assert.assertEquals((String)"LockType", (Object)LockType.SHARED_READ, (Object)((ShowLocksResponseElement)resp.getLocks().get(0)).getType());
        Assert.assertEquals((String)"LockState", (Object)LockState.ACQUIRED, (Object)((ShowLocksResponseElement)resp.getLocks().get(0)).getState());
        Assert.assertEquals((String)"AgentInfo", (Object)agentInfo, (Object)((ShowLocksResponseElement)resp.getLocks().get(0)).getAgentInfo());
        txnBatch.abort();
        this.checkNothingWritten(partLoc);
        Assert.assertEquals((Object)TransactionBatch.TxnState.ABORTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.beginNextTransaction();
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.write("2,Welcome to streaming".getBytes());
        txnBatch.commit();
        this.checkDataWritten(partLoc, 1L, 10L, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}");
        txnBatch.close();
        connection.close();
    }

    @Test
    public void testMultipleTransactionBatchCommits() throws Exception {
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
        TransactionBatch txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("1,Hello streaming".getBytes());
        txnBatch.commit();
        String validationQuery = "select id, msg from testing.alerts order by id, msg";
        this.checkDataWritten2(partLoc, 1L, 10L, 1, validationQuery, false, "1\tHello streaming");
        txnBatch.beginNextTransaction();
        txnBatch.write("2,Welcome to streaming".getBytes());
        txnBatch.commit();
        this.checkDataWritten2(partLoc, 1L, 10L, 1, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming");
        txnBatch.close();
        txnBatch = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("3,Hello streaming - once again".getBytes());
        txnBatch.commit();
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, false, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again");
        txnBatch.beginNextTransaction();
        txnBatch.write("4,Welcome to streaming - once again".getBytes());
        txnBatch.commit();
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again");
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch.getCurrentTransactionState());
        txnBatch.close();
        connection.close();
    }

    @Test
    public void testInterleavedTransactionBatchCommits() throws Exception {
        long actualLength;
        long logicalLength;
        long lengthFileSize;
        Path lengthFile;
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
        StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
        TransactionBatch txnBatch1 = connection.fetchTransactionBatch(10, (RecordWriter)writer);
        txnBatch1.beginNextTransaction();
        DelimitedInputWriter writer2 = new DelimitedInputWriter(fieldNames, ",", endPt);
        TransactionBatch txnBatch2 = connection.fetchTransactionBatch(10, (RecordWriter)writer2);
        txnBatch2.beginNextTransaction();
        txnBatch1.write("1,Hello streaming".getBytes());
        txnBatch2.write("3,Hello streaming - once again".getBytes());
        this.checkNothingWritten(partLoc);
        txnBatch2.commit();
        String validationQuery = "select id, msg from testing.alerts order by id, msg";
        this.checkDataWritten2(partLoc, 11L, 20L, 1, validationQuery, true, "3\tHello streaming - once again");
        txnBatch1.commit();
        FileSystem fs = partLoc.getFileSystem((Configuration)this.conf);
        AcidUtils.Directory dir = AcidUtils.getAcidState((Path)partLoc, (Configuration)this.conf, (ValidWriteIdList)this.msClient.getValidWriteIds(AcidUtils.getFullTableName((String)dbName, (String)"alerts")));
        for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
            for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
                lengthFile = OrcAcidUtils.getSideFile((Path)stat.getPath());
                Assert.assertTrue((String)(lengthFile + " missing"), (boolean)fs.exists(lengthFile));
                lengthFileSize = fs.getFileStatus(lengthFile).getLen();
                Assert.assertTrue((String)("Expected " + lengthFile + " to be non empty. lengh=" + lengthFileSize), (lengthFileSize > 0L ? 1 : 0) != 0);
                logicalLength = AcidUtils.getLogicalLength((FileSystem)fs, (FileStatus)stat);
                actualLength = stat.getLen();
                Assert.assertTrue((String)"", (logicalLength == actualLength ? 1 : 0) != 0);
            }
        }
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, false, "1\tHello streaming", "3\tHello streaming - once again");
        txnBatch1.beginNextTransaction();
        txnBatch1.write("2,Welcome to streaming".getBytes());
        txnBatch2.beginNextTransaction();
        txnBatch2.write("4,Welcome to streaming - once again".getBytes());
        dir = AcidUtils.getAcidState((Path)partLoc, (Configuration)this.conf, (ValidWriteIdList)this.msClient.getValidWriteIds(AcidUtils.getFullTableName((String)dbName, (String)"alerts")));
        for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
            for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
                lengthFile = OrcAcidUtils.getSideFile((Path)stat.getPath());
                Assert.assertTrue((String)(lengthFile + " missing"), (boolean)fs.exists(lengthFile));
                lengthFileSize = fs.getFileStatus(lengthFile).getLen();
                Assert.assertTrue((String)("Expected " + lengthFile + " to be non empty. lengh=" + lengthFileSize), (lengthFileSize > 0L ? 1 : 0) != 0);
                logicalLength = AcidUtils.getLogicalLength((FileSystem)fs, (FileStatus)stat);
                actualLength = stat.getLen();
                Assert.assertTrue((String)"", (logicalLength <= actualLength ? 1 : 0) != 0);
            }
        }
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, true, "1\tHello streaming", "3\tHello streaming - once again");
        txnBatch1.commit();
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, false, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again");
        txnBatch2.commit();
        this.checkDataWritten2(partLoc, 1L, 20L, 2, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again");
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch1.getCurrentTransactionState());
        Assert.assertEquals((Object)TransactionBatch.TxnState.COMMITTED, (Object)txnBatch2.getCurrentTransactionState());
        txnBatch1.close();
        txnBatch2.close();
        connection.close();
    }

    @Test
    public void testCreatePartition() throws Exception {
        HiveEndPoint ep = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        StreamingConnection conn = ep.newConnection(true);
        conn.close();
        conn = ep.newConnection(true);
        conn.close();
    }

    @Test
    public void testConcurrentTransactionBatchCommits() throws Exception {
        HiveEndPoint ep = new HiveEndPoint(this.metaStoreURI, dbName, "alerts", this.partitionVals);
        ArrayList<WriterThd> writers = new ArrayList<WriterThd>(3);
        writers.add(new WriterThd(ep, "1,Matrix"));
        writers.add(new WriterThd(ep, "2,Gandhi"));
        writers.add(new WriterThd(ep, "3,Silence"));
        for (WriterThd w : writers) {
            w.start();
        }
        for (WriterThd w : writers) {
            w.join();
        }
        for (WriterThd w : writers) {
            if (w.error == null) continue;
            Assert.assertFalse((String)("Writer thread" + w.getName() + " died: " + w.error.getMessage() + " See log file for stack trace"), (boolean)true);
        }
    }

    private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException {
        LocalFileSystem fs = FileSystem.getLocal((Configuration)new Configuration());
        Reader reader = OrcFile.createReader((Path)orcFile, (OrcFile.ReaderOptions)OrcFile.readerOptions((Configuration)this.conf).filesystem((FileSystem)fs));
        RecordReader rows = reader.rows();
        StructObjectInspector inspector = (StructObjectInspector)reader.getObjectInspector();
        System.out.format("Found Bucket File : %s \n", orcFile.getName());
        ArrayList<SampleRec> result = new ArrayList<SampleRec>();
        while (rows.hasNext()) {
            Object row = rows.next(null);
            SampleRec rec = (SampleRec)TestStreaming.deserializeDeltaFileRow(row, inspector)[5];
            result.add(rec);
        }
        return result;
    }

    private static Object[] deserializeDeltaFileRow(Object row, StructObjectInspector inspector) {
        List fields = inspector.getAllStructFieldRefs();
        WritableIntObjectInspector f0ins = (WritableIntObjectInspector)((StructField)fields.get(0)).getFieldObjectInspector();
        WritableLongObjectInspector f1ins = (WritableLongObjectInspector)((StructField)fields.get(1)).getFieldObjectInspector();
        WritableIntObjectInspector f2ins = (WritableIntObjectInspector)((StructField)fields.get(2)).getFieldObjectInspector();
        WritableLongObjectInspector f3ins = (WritableLongObjectInspector)((StructField)fields.get(3)).getFieldObjectInspector();
        WritableLongObjectInspector f4ins = (WritableLongObjectInspector)((StructField)fields.get(4)).getFieldObjectInspector();
        StructObjectInspector f5ins = (StructObjectInspector)((StructField)fields.get(5)).getFieldObjectInspector();
        int f0 = f0ins.get(inspector.getStructFieldData(row, (StructField)fields.get(0)));
        long f1 = f1ins.get(inspector.getStructFieldData(row, (StructField)fields.get(1)));
        int f2 = f2ins.get(inspector.getStructFieldData(row, (StructField)fields.get(2)));
        long f3 = f3ins.get(inspector.getStructFieldData(row, (StructField)fields.get(3)));
        long f4 = f4ins.get(inspector.getStructFieldData(row, (StructField)fields.get(4)));
        SampleRec f5 = TestStreaming.deserializeInner(inspector.getStructFieldData(row, (StructField)fields.get(5)), f5ins);
        return new Object[]{f0, f1, f2, f3, f4, f5};
    }

    private static SampleRec deserializeInner(Object row, StructObjectInspector inspector) {
        List fields = inspector.getAllStructFieldRefs();
        WritableStringObjectInspector f0ins = (WritableStringObjectInspector)((StructField)fields.get(0)).getFieldObjectInspector();
        WritableIntObjectInspector f1ins = (WritableIntObjectInspector)((StructField)fields.get(1)).getFieldObjectInspector();
        WritableStringObjectInspector f2ins = (WritableStringObjectInspector)((StructField)fields.get(2)).getFieldObjectInspector();
        String f0 = f0ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, (StructField)fields.get(0)));
        int f1 = f1ins.get(inspector.getStructFieldData(row, (StructField)fields.get(1)));
        String f2 = f2ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, (StructField)fields.get(2)));
        return new SampleRec(f0, f1, f2);
    }

    @Test
    public void testBucketing() throws Exception {
        String agentInfo = "UT_" + Thread.currentThread().getName();
        TestStreaming.dropDB(this.msClient, dbName3);
        TestStreaming.dropDB(this.msClient, dbName4);
        String dbLocation = this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
        dbLocation = dbLocation.replaceAll("\\\\", "/");
        String[] colNames = "key1,key2,data".split(",");
        String[] colTypes = "string,int,string".split(",");
        String[] bucketNames = "key1,key2".split(",");
        int bucketCount = 4;
        TestStreaming.createDbAndTable(this.driver, dbName3, tblName3, null, colNames, colTypes, bucketNames, null, dbLocation, bucketCount);
        String dbLocation2 = this.dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
        dbLocation2 = dbLocation2.replaceAll("\\\\", "/");
        String[] colNames2 = "key3,key4,data2".split(",");
        String[] colTypes2 = "string,int,string".split(",");
        String[] bucketNames2 = "key3,key4".split(",");
        TestStreaming.createDbAndTable(this.driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2, null, dbLocation2, bucketCount);
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName3, tblName3, null);
        StreamingConnection connection = endPt.newConnection(false, agentInfo);
        DelimitedInputWriter writer = new DelimitedInputWriter(colNames, ",", endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(2, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("name0,1,Hello streaming".getBytes());
        txnBatch.write("name2,2,Welcome to streaming".getBytes());
        txnBatch.write("name4,2,more Streaming unlimited".getBytes());
        txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
        txnBatch.commit();
        HiveEndPoint endPt2 = new HiveEndPoint(this.metaStoreURI, dbName4, tblName4, null);
        StreamingConnection connection2 = endPt2.newConnection(false, agentInfo);
        DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2, ",", endPt2, connection);
        TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, (RecordWriter)writer2);
        txnBatch2.beginNextTransaction();
        txnBatch2.write("name5,2,fact3".getBytes());
        txnBatch2.write("name8,2,fact3".getBytes());
        txnBatch2.write("name0,1,fact1".getBytes());
        txnBatch2.commit();
        HashMap<Integer, ArrayList<SampleRec>> actual1 = this.dumpAllBuckets(dbLocation, tblName3);
        HashMap<Integer, ArrayList<SampleRec>> actual2 = this.dumpAllBuckets(dbLocation2, tblName4);
        System.err.println("\n  Table 1");
        System.err.println(actual1);
        System.err.println("\n  Table 2");
        System.err.println(actual2);
        Assert.assertEquals((String)"number of buckets does not match expectation", (long)actual1.values().size(), (long)3L);
        Assert.assertTrue((String)"bucket 0 shouldn't have been created", (actual1.get(0) == null ? 1 : 0) != 0);
        Assert.assertEquals((String)"records in bucket does not match expectation", (long)actual1.get(1).size(), (long)1L);
        Assert.assertEquals((String)"records in bucket does not match expectation", (long)actual1.get(2).size(), (long)2L);
        Assert.assertEquals((String)"records in bucket does not match expectation", (long)actual1.get(3).size(), (long)1L);
    }

    private void runCmdOnDriver(String cmd) throws QueryFailedException {
        boolean t = TestStreaming.runDDL(this.driver, cmd);
        Assert.assertTrue((String)(cmd + " failed"), (boolean)t);
    }

    @Test
    public void testFileDump() throws Exception {
        String agentInfo = "UT_" + Thread.currentThread().getName();
        TestStreaming.dropDB(this.msClient, dbName3);
        TestStreaming.dropDB(this.msClient, dbName4);
        String dbLocation = this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
        dbLocation = dbLocation.replaceAll("\\\\", "/");
        String[] colNames = "key1,key2,data".split(",");
        String[] colTypes = "string,int,string".split(",");
        String[] bucketNames = "key1,key2".split(",");
        int bucketCount = 4;
        TestStreaming.createDbAndTable(this.driver, dbName3, tblName3, null, colNames, colTypes, bucketNames, null, dbLocation, bucketCount);
        String dbLocation2 = this.dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
        dbLocation2 = dbLocation2.replaceAll("\\\\", "/");
        String[] colNames2 = "key3,key4,data2".split(",");
        String[] colTypes2 = "string,int,string".split(",");
        String[] bucketNames2 = "key3,key4".split(",");
        TestStreaming.createDbAndTable(this.driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2, null, dbLocation2, bucketCount);
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName3, tblName3, null);
        StreamingConnection connection = endPt.newConnection(false, agentInfo);
        DelimitedInputWriter writer = new DelimitedInputWriter(colNames, ",", endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(2, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("name0,1,Hello streaming".getBytes());
        txnBatch.write("name2,2,Welcome to streaming".getBytes());
        txnBatch.write("name4,2,more Streaming unlimited".getBytes());
        txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
        txnBatch.commit();
        PrintStream origErr = System.err;
        ByteArrayOutputStream myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.err.flush();
        System.setErr(origErr);
        String errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)false, (Object)errDump.contains("file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        HiveEndPoint endPt2 = new HiveEndPoint(this.metaStoreURI, dbName4, tblName4, null);
        DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2, ",", endPt2);
        StreamingConnection connection2 = endPt2.newConnection(false, agentInfo);
        TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, (RecordWriter)writer2);
        txnBatch2.beginNextTransaction();
        txnBatch2.write("name5,2,fact3".getBytes());
        txnBatch2.write("name8,2,fact3".getBytes());
        txnBatch2.write("name0,1,fact1".getBytes());
        txnBatch2.commit();
        origErr = System.err;
        myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.out.flush();
        System.err.flush();
        System.setErr(origErr);
        errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
    }

    @Test
    public void testFileDumpCorruptDataFiles() throws Exception {
        TestStreaming.dropDB(this.msClient, dbName3);
        String dbLocation = this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
        dbLocation = dbLocation.replaceAll("\\\\", "/");
        String[] colNames = "key1,key2,data".split(",");
        String[] colTypes = "string,int,string".split(",");
        String[] bucketNames = "key1,key2".split(",");
        int bucketCount = 4;
        TestStreaming.createDbAndTable(this.driver, dbName3, tblName3, null, colNames, colTypes, bucketNames, null, dbLocation, bucketCount);
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName3, tblName3, null);
        StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
        DelimitedInputWriter writer = new DelimitedInputWriter(colNames, ",", endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(2, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("name0,1,Hello streaming".getBytes());
        txnBatch.write("name2,2,Welcome to streaming".getBytes());
        txnBatch.write("name4,2,more Streaming unlimited".getBytes());
        txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
        txnBatch.commit();
        Path path = new Path(dbLocation);
        Collection files = FileDump.getAllFilesInPath((Path)path, (Configuration)this.conf);
        int readableFooter = -1;
        for (String file : files) {
            if (file.contains("bucket_00000")) {
                this.corruptDataFile(file, (Configuration)this.conf, Integer.MIN_VALUE);
                continue;
            }
            if (file.contains("bucket_00001")) {
                this.corruptDataFile(file, (Configuration)this.conf, -1);
                continue;
            }
            if (file.contains("bucket_00002")) {
                this.corruptDataFile(file, (Configuration)this.conf, 100);
                continue;
            }
            if (!file.contains("bucket_00003")) continue;
            this.corruptDataFile(file, (Configuration)this.conf, 100);
        }
        PrintStream origErr = System.err;
        ByteArrayOutputStream myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.err.flush();
        System.setErr(origErr);
        String errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("3 file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        origErr = System.err;
        myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation, "--recover", "--skip-dump"});
        System.err.flush();
        System.setErr(origErr);
        errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00001 recovered successfully!"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("No readable footers found. Creating empty orc file."));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00002 recovered successfully!"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00003 recovered successfully!"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        origErr = System.err;
        myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.err.flush();
        System.setErr(origErr);
        errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        files = FileDump.getAllFilesInPath((Path)path, (Configuration)this.conf);
        for (String file : files) {
            Assert.assertEquals((Object)false, (Object)file.contains("_flush_length"));
        }
        txnBatch.close();
    }

    private void corruptDataFile(String file, Configuration conf, int addRemoveBytes) throws Exception {
        Path bPath = new Path(file);
        Path cPath = new Path(bPath.getParent(), bPath.getName() + ".corrupt");
        FileSystem fs = bPath.getFileSystem(conf);
        FileStatus fileStatus = fs.getFileStatus(bPath);
        int len = addRemoveBytes == Integer.MIN_VALUE ? 0 : (int)fileStatus.getLen() + addRemoveBytes;
        byte[] buffer = new byte[len];
        FSDataInputStream fdis = fs.open(bPath);
        fdis.readFully(0L, buffer, 0, (int)Math.min(fileStatus.getLen(), (long)buffer.length));
        fdis.close();
        FSDataOutputStream fdos = fs.create(cPath, true);
        fdos.write(buffer, 0, buffer.length);
        fdos.close();
        fs.delete(bPath, false);
        fs.rename(cPath, bPath);
    }

    @Test
    public void testFileDumpCorruptSideFiles() throws Exception {
        TestStreaming.dropDB(this.msClient, dbName3);
        String dbLocation = this.dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
        dbLocation = dbLocation.replaceAll("\\\\", "/");
        String[] colNames = "key1,key2,data".split(",");
        String[] colTypes = "string,int,string".split(",");
        String[] bucketNames = "key1,key2".split(",");
        int bucketCount = 4;
        TestStreaming.createDbAndTable(this.driver, dbName3, tblName3, null, colNames, colTypes, bucketNames, null, dbLocation, bucketCount);
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, dbName3, tblName3, null);
        StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
        DelimitedInputWriter writer = new DelimitedInputWriter(colNames, ",", endPt, connection);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(2, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("name0,1,Hello streaming".getBytes());
        txnBatch.write("name2,2,Welcome to streaming".getBytes());
        txnBatch.write("name4,2,more Streaming unlimited".getBytes());
        txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
        txnBatch.write("name6,3,aHello streaming".getBytes());
        txnBatch.commit();
        HashMap<String, List<Long>> offsetMap = new HashMap<String, List<Long>>();
        this.recordOffsets(this.conf, dbLocation, offsetMap);
        txnBatch.beginNextTransaction();
        txnBatch.write("name01,11,-Hello streaming".getBytes());
        txnBatch.write("name21,21,-Welcome to streaming".getBytes());
        txnBatch.write("name41,21,-more Streaming unlimited".getBytes());
        txnBatch.write("name51,21,-even more Streaming unlimited".getBytes());
        txnBatch.write("name02,12,--Hello streaming".getBytes());
        txnBatch.write("name22,22,--Welcome to streaming".getBytes());
        txnBatch.write("name42,22,--more Streaming unlimited".getBytes());
        txnBatch.write("name52,22,--even more Streaming unlimited".getBytes());
        txnBatch.write("name7,4,aWelcome to streaming".getBytes());
        txnBatch.write("name8,5,amore Streaming unlimited".getBytes());
        txnBatch.write("name9,6,aeven more Streaming unlimited".getBytes());
        txnBatch.write("name10,7,bHello streaming".getBytes());
        txnBatch.write("name11,8,bWelcome to streaming".getBytes());
        txnBatch.write("name12,9,bmore Streaming unlimited".getBytes());
        txnBatch.write("name13,10,beven more Streaming unlimited".getBytes());
        txnBatch.commit();
        this.recordOffsets(this.conf, dbLocation, offsetMap);
        Path path = new Path(dbLocation);
        Collection files = FileDump.getAllFilesInPath((Path)path, (Configuration)this.conf);
        for (String file : files) {
            if (file.contains("bucket_00000")) {
                this.corruptSideFile(file, this.conf, offsetMap, "bucket_00000", -1);
                continue;
            }
            if (file.contains("bucket_00001")) {
                this.corruptSideFile(file, this.conf, offsetMap, "bucket_00001", 0);
                continue;
            }
            if (file.contains("bucket_00002")) {
                this.corruptSideFile(file, this.conf, offsetMap, "bucket_00002", 3);
                continue;
            }
            if (!file.contains("bucket_00003")) continue;
            this.corruptSideFile(file, this.conf, offsetMap, "bucket_00003", 10);
        }
        PrintStream origErr = System.err;
        ByteArrayOutputStream myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.err.flush();
        System.setErr(origErr);
        String errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00000_flush_length [length: 11"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00001_flush_length [length: 0"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00002_flush_length [length: 24"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00003_flush_length [length: 80"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("4 file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        origErr = System.err;
        myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation, "--recover", "--skip-dump"});
        System.err.flush();
        System.setErr(origErr);
        errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00000 recovered successfully!"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00001 recovered successfully!"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00002 recovered successfully!"));
        Assert.assertEquals((Object)true, (Object)errDump.contains("bucket_00003 recovered successfully!"));
        List offsets = (List)offsetMap.get("bucket_00000");
        Assert.assertEquals((Object)true, (Object)errDump.contains("Readable footerOffsets: " + offsets.toString()));
        offsets = (List)offsetMap.get("bucket_00001");
        Assert.assertEquals((Object)true, (Object)errDump.contains("Readable footerOffsets: " + offsets.toString()));
        offsets = (List)offsetMap.get("bucket_00002");
        Assert.assertEquals((Object)true, (Object)errDump.contains("Readable footerOffsets: " + offsets.toString()));
        offsets = (List)offsetMap.get("bucket_00003");
        Assert.assertEquals((Object)true, (Object)errDump.contains("Readable footerOffsets: " + offsets.toString()));
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        origErr = System.err;
        myErr = new ByteArrayOutputStream();
        System.setErr(new PrintStream(myErr));
        FileDump.main((String[])new String[]{dbLocation});
        System.err.flush();
        System.setErr(origErr);
        errDump = new String(myErr.toByteArray());
        Assert.assertEquals((Object)false, (Object)errDump.contains("Exception"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("file(s) are corrupted"));
        Assert.assertEquals((Object)false, (Object)errDump.contains("is still open for writes."));
        files = FileDump.getAllFilesInPath((Path)path, (Configuration)this.conf);
        for (String file : files) {
            Assert.assertEquals((Object)false, (Object)file.contains("_flush_length"));
        }
        txnBatch.close();
    }

    private void corruptSideFile(String file, HiveConf conf, Map<String, List<Long>> offsetMap, String key, int numEntries) throws IOException {
        Path dataPath = new Path(file);
        Path sideFilePath = OrcAcidUtils.getSideFile((Path)dataPath);
        Path cPath = new Path(sideFilePath.getParent(), sideFilePath.getName() + ".corrupt");
        FileSystem fs = sideFilePath.getFileSystem((Configuration)conf);
        List<Long> offsets = offsetMap.get(key);
        long lastOffset = offsets.get(offsets.size() - 1);
        FSDataOutputStream fdos = fs.create(cPath, true);
        if (numEntries < 0) {
            byte[] lastOffsetBytes = this.longToBytes(lastOffset);
            for (int i = 0; i < offsets.size() - 1; ++i) {
                fdos.writeLong(offsets.get(i).longValue());
            }
            fdos.write(lastOffsetBytes, 0, 3);
        } else if (numEntries > 0) {
            int firstRun = Math.min(offsets.size(), numEntries);
            for (int i = 0; i < firstRun; ++i) {
                fdos.writeLong(offsets.get(i).longValue());
            }
            int remaining = numEntries - firstRun;
            for (int i = 0; i < remaining; ++i) {
                fdos.writeLong(lastOffset + (long)((i + 1) * 100));
            }
        }
        fdos.close();
        fs.delete(sideFilePath, false);
        fs.rename(cPath, sideFilePath);
    }

    private byte[] longToBytes(long x) {
        ByteBuffer buffer = ByteBuffer.allocate(8);
        buffer.putLong(x);
        return buffer.array();
    }

    private void recordOffsets(HiveConf conf, String dbLocation, Map<String, List<Long>> offsetMap) throws IOException {
        Path path = new Path(dbLocation);
        Collection files = FileDump.getAllFilesInPath((Path)path, (Configuration)conf);
        for (String file : files) {
            List<Object> offsets;
            Path bPath = new Path(file);
            FileSystem fs = bPath.getFileSystem((Configuration)conf);
            FileStatus fileStatus = fs.getFileStatus(bPath);
            long len = fileStatus.getLen();
            if (file.contains("bucket_00000")) {
                if (offsetMap.containsKey("bucket_00000")) {
                    offsets = offsetMap.get("bucket_00000");
                    offsets.add(len);
                    offsetMap.put("bucket_00000", offsets);
                    continue;
                }
                offsets = new ArrayList();
                offsets.add(len);
                offsetMap.put("bucket_00000", offsets);
                continue;
            }
            if (file.contains("bucket_00001")) {
                if (offsetMap.containsKey("bucket_00001")) {
                    offsets = offsetMap.get("bucket_00001");
                    offsets.add(len);
                    offsetMap.put("bucket_00001", offsets);
                    continue;
                }
                offsets = new ArrayList();
                offsets.add(len);
                offsetMap.put("bucket_00001", offsets);
                continue;
            }
            if (file.contains("bucket_00002")) {
                if (offsetMap.containsKey("bucket_00002")) {
                    offsets = offsetMap.get("bucket_00002");
                    offsets.add(len);
                    offsetMap.put("bucket_00002", offsets);
                    continue;
                }
                offsets = new ArrayList();
                offsets.add(len);
                offsetMap.put("bucket_00002", offsets);
                continue;
            }
            if (!file.contains("bucket_00003")) continue;
            if (offsetMap.containsKey("bucket_00003")) {
                offsets = offsetMap.get("bucket_00003");
                offsets.add(len);
                offsetMap.put("bucket_00003", offsets);
                continue;
            }
            offsets = new ArrayList();
            offsets.add(len);
            offsetMap.put("bucket_00003", offsets);
        }
    }

    @Test
    public void testErrorHandling() throws Exception {
        String agentInfo = "UT_" + Thread.currentThread().getName();
        this.runCmdOnDriver("create database testErrors");
        this.runCmdOnDriver("use testErrors");
        this.runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
        HiveEndPoint endPt = new HiveEndPoint(this.metaStoreURI, "testErrors", "T", null);
        StreamingConnection connection = endPt.newConnection(false, agentInfo);
        DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","), ",", endPt, connection);
        FaultyWriter writer = new FaultyWriter((RecordWriter)innerWriter);
        TransactionBatch txnBatch = connection.fetchTransactionBatch(2, (RecordWriter)writer);
        txnBatch.close();
        txnBatch.heartbeat();
        txnBatch.abort();
        GetOpenTxnsInfoResponse r = this.msClient.showTxns();
        Assert.assertEquals((String)"HWM didn't match", (long)17L, (long)r.getTxn_high_water_mark());
        List ti = r.getOpen_txns();
        Assert.assertEquals((String)"wrong status ti(0)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(0)).getState());
        Assert.assertEquals((String)"wrong status ti(1)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(1)).getState());
        Throwable expectedEx = null;
        try {
            txnBatch.beginNextTransaction();
        }
        catch (IllegalStateException ex) {
            expectedEx = ex;
        }
        Assert.assertTrue((String)"beginNextTransaction() should have failed", (expectedEx != null && expectedEx.getMessage().contains("has been closed()") ? 1 : 0) != 0);
        expectedEx = null;
        try {
            txnBatch.write("name0,1,Hello streaming".getBytes());
        }
        catch (IllegalStateException ex) {
            expectedEx = ex;
        }
        Assert.assertTrue((String)"write()  should have failed", (expectedEx != null && expectedEx.getMessage().contains("has been closed()") ? 1 : 0) != 0);
        expectedEx = null;
        try {
            txnBatch.commit();
        }
        catch (IllegalStateException ex) {
            expectedEx = ex;
        }
        Assert.assertTrue((String)"commit() should have failed", (expectedEx != null && expectedEx.getMessage().contains("has been closed()") ? 1 : 0) != 0);
        txnBatch = connection.fetchTransactionBatch(2, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("name2,2,Welcome to streaming".getBytes());
        txnBatch.write("name4,2,more Streaming unlimited".getBytes());
        txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
        txnBatch.commit();
        String s = txnBatch.toString();
        Assert.assertTrue((String)("Actual: " + s), (boolean)s.contains("LastUsed " + JavaUtils.txnIdToString((long)txnBatch.getCurrentTxnId())));
        Assert.assertTrue((String)("Actual: " + s), (boolean)s.contains("TxnStatus[CO]"));
        expectedEx = null;
        txnBatch.beginNextTransaction();
        writer.enableErrors();
        try {
            txnBatch.write("name6,2,Doh!".getBytes());
        }
        catch (StreamingIOFailure ex) {
            expectedEx = ex;
            txnBatch.getCurrentTransactionState();
            txnBatch.getCurrentTxnId();
        }
        Assert.assertTrue((String)("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?")), (expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred") ? 1 : 0) != 0);
        expectedEx = null;
        try {
            txnBatch.commit();
        }
        catch (IllegalStateException ex) {
            expectedEx = ex;
        }
        Assert.assertTrue((String)"commit() should have failed", (expectedEx != null && expectedEx.getMessage().contains("has been closed()") ? 1 : 0) != 0);
        s = txnBatch.toString();
        Assert.assertTrue((String)("Actual: " + s), (boolean)s.contains("LastUsed " + JavaUtils.txnIdToString((long)txnBatch.getCurrentTxnId())));
        Assert.assertTrue((String)("Actual: " + s), (boolean)s.contains("TxnStatus[CA]"));
        r = this.msClient.showTxns();
        Assert.assertEquals((String)"HWM didn't match", (long)19L, (long)r.getTxn_high_water_mark());
        ti = r.getOpen_txns();
        Assert.assertEquals((String)"wrong status ti(0)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(0)).getState());
        Assert.assertEquals((String)"wrong status ti(1)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(1)).getState());
        Assert.assertEquals((String)"wrong status ti(2)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(2)).getState());
        writer.disableErrors();
        txnBatch = connection.fetchTransactionBatch(2, (RecordWriter)writer);
        txnBatch.beginNextTransaction();
        txnBatch.write("name2,2,Welcome to streaming".getBytes());
        writer.enableErrors();
        expectedEx = null;
        try {
            txnBatch.commit();
        }
        catch (StreamingIOFailure ex) {
            expectedEx = ex;
        }
        Assert.assertTrue((String)("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?")), (expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred") ? 1 : 0) != 0);
        r = this.msClient.showTxns();
        Assert.assertEquals((String)"HWM didn't match", (long)21L, (long)r.getTxn_high_water_mark());
        ti = r.getOpen_txns();
        Assert.assertEquals((String)"wrong status ti(3)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(3)).getState());
        Assert.assertEquals((String)"wrong status ti(4)", (Object)TxnState.ABORTED, (Object)((TxnInfo)ti.get(4)).getState());
        txnBatch.abort();
    }

    private HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets(String dbLocation, String tableName) throws IOException {
        HashMap<Integer, ArrayList<SampleRec>> result = new HashMap<Integer, ArrayList<SampleRec>>();
        for (File deltaDir : new File(dbLocation + "/" + tableName).listFiles()) {
            File[] bucketFiles;
            if (!deltaDir.getName().startsWith("delta")) continue;
            for (File bucketFile : bucketFiles = deltaDir.listFiles(new FileFilter(){

                @Override
                public boolean accept(File pathname) {
                    String name = pathname.getName();
                    return !name.startsWith("_") && !name.startsWith(".");
                }
            })) {
                if (bucketFile.toString().endsWith("length")) continue;
                Integer bucketNum = this.getBucketNumber(bucketFile);
                ArrayList<SampleRec> recs = this.dumpBucket(new Path(bucketFile.toString()));
                result.put(bucketNum, recs);
            }
        }
        return result;
    }

    private Integer getBucketNumber(File bucketFile) {
        String fname = bucketFile.getName();
        int start = fname.indexOf(95);
        String number = fname.substring(start + 1, fname.length());
        return Integer.parseInt(number);
    }

    public static void dropDB(IMetaStoreClient client, String databaseName) {
        try {
            for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
                client.dropTable(databaseName, table, true, true);
            }
            client.dropDatabase(databaseName);
        }
        catch (TException tException) {
            // empty catch block
        }
    }

    private static Path createDbAndTable(IDriver driver, String databaseName, String tableName, List<String> partVals, String[] colNames, String[] colTypes, String[] bucketCols, String[] partNames, String dbLocation, int bucketCount) throws Exception {
        String dbUri = "raw://" + new Path(dbLocation).toUri().toString();
        String tableLoc = dbUri + "/" + tableName;
        TestStreaming.runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri + "'");
        TestStreaming.runDDL(driver, "use " + databaseName);
        String crtTbl = "create table " + tableName + " ( " + TestStreaming.getTableColumnsStr(colNames, colTypes) + " )" + TestStreaming.getPartitionStmtStr(partNames) + " clustered by ( " + TestStreaming.join(bucketCols, ",") + " ) into " + bucketCount + " buckets  stored as orc  location '" + tableLoc + "' TBLPROPERTIES ('transactional'='true') ";
        TestStreaming.runDDL(driver, crtTbl);
        if (partNames != null && partNames.length != 0) {
            return TestStreaming.addPartition(driver, tableName, partVals, partNames);
        }
        return new Path(tableLoc);
    }

    private static Path addPartition(IDriver driver, String tableName, List<String> partVals, String[] partNames) throws Exception {
        String partSpec = TestStreaming.getPartsSpec(partNames, partVals);
        String addPart = "alter table " + tableName + " add partition ( " + partSpec + " )";
        TestStreaming.runDDL(driver, addPart);
        return TestStreaming.getPartitionPath(driver, tableName, partSpec);
    }

    private static Path getPartitionPath(IDriver driver, String tableName, String partSpec) throws Exception {
        ArrayList<String> res = TestStreaming.queryTable(driver, "describe extended " + tableName + " PARTITION (" + partSpec + ")");
        String partInfo = res.get(res.size() - 1);
        int start = partInfo.indexOf("location:") + "location:".length();
        int end = partInfo.indexOf(",", start);
        return new Path(partInfo.substring(start, end));
    }

    private static String getTableColumnsStr(String[] colNames, String[] colTypes) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < colNames.length; ++i) {
            sb.append(colNames[i]).append(" ").append(colTypes[i]);
            if (i >= colNames.length - 1) continue;
            sb.append(",");
        }
        return sb.toString();
    }

    private static String getTablePartsStr(String[] partNames) {
        if (partNames == null || partNames.length == 0) {
            return "";
        }
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < partNames.length; ++i) {
            sb.append(partNames[i]).append(" string");
            if (i >= partNames.length - 1) continue;
            sb.append(",");
        }
        return sb.toString();
    }

    private static String getPartsSpec(String[] partNames, List<String> partVals) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < partVals.size(); ++i) {
            sb.append(partNames[i]).append(" = '").append(partVals.get(i)).append("'");
            if (i >= partVals.size() - 1) continue;
            sb.append(",");
        }
        return sb.toString();
    }

    private static String join(String[] values, String delimiter) {
        if (values == null) {
            return null;
        }
        StringBuilder strbuf = new StringBuilder();
        boolean first = true;
        for (String value : values) {
            if (!first) {
                strbuf.append(delimiter);
            } else {
                first = false;
            }
            strbuf.append(value.toString());
        }
        return strbuf.toString();
    }

    private static String getPartitionStmtStr(String[] partNames) {
        if (partNames == null || partNames.length == 0) {
            return "";
        }
        return " partitioned by (" + TestStreaming.getTablePartsStr(partNames) + " )";
    }

    private static boolean runDDL(IDriver driver, String sql) throws QueryFailedException {
        LOG.debug(sql);
        System.out.println(sql);
        CommandProcessorResponse cpr = driver.run(sql);
        if (cpr.getResponseCode() == 0) {
            return true;
        }
        LOG.error("Statement: " + sql + " failed: " + cpr);
        return false;
    }

    private static ArrayList<String> queryTable(IDriver driver, String query) throws IOException {
        CommandProcessorResponse cpr = driver.run(query);
        if (cpr.getResponseCode() != 0) {
            throw new RuntimeException(query + " failed: " + cpr);
        }
        ArrayList<String> res = new ArrayList<String>();
        driver.getResults(res);
        return res;
    }

    static {
        fieldNames2 = new String[]{COL1, COL2};
    }

    private static final class FaultyWriter
    implements RecordWriter {
        private final RecordWriter delegate;
        private boolean shouldThrow = false;

        private FaultyWriter(RecordWriter delegate) {
            assert (delegate != null);
            this.delegate = delegate;
        }

        public void write(long writeId, byte[] record) throws StreamingException {
            this.delegate.write(writeId, record);
            this.produceFault();
        }

        public void flush() throws StreamingException {
            this.delegate.flush();
            this.produceFault();
        }

        public void clear() throws StreamingException {
            this.delegate.clear();
        }

        public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException {
            this.delegate.newBatch(minTxnId, maxTxnID);
        }

        public void closeBatch() throws StreamingException {
            this.delegate.closeBatch();
        }

        private void produceFault() throws StreamingIOFailure {
            if (this.shouldThrow) {
                throw new StreamingIOFailure("Simulated fault occurred");
            }
        }

        void enableErrors() {
            this.shouldThrow = true;
        }

        void disableErrors() {
            this.shouldThrow = false;
        }
    }

    private static class SampleRec {
        public String field1;
        public int field2;
        public String field3;

        public SampleRec(String field1, int field2, String field3) {
            this.field1 = field1;
            this.field2 = field2;
            this.field3 = field3;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SampleRec that = (SampleRec)o;
            if (this.field2 != that.field2) {
                return false;
            }
            if (this.field1 != null ? !this.field1.equals(that.field1) : that.field1 != null) {
                return false;
            }
            return !(this.field3 == null ? that.field3 != null : !this.field3.equals(that.field3));
        }

        public int hashCode() {
            int result = this.field1 != null ? this.field1.hashCode() : 0;
            result = 31 * result + this.field2;
            result = 31 * result + (this.field3 != null ? this.field3.hashCode() : 0);
            return result;
        }

        public String toString() {
            return " { '" + this.field1 + '\'' + "," + this.field2 + ",'" + this.field3 + '\'' + " }";
        }
    }

    private static class WriterThd
    extends Thread {
        private final StreamingConnection conn;
        private final DelimitedInputWriter writer;
        private final String data;
        private Throwable error;

        WriterThd(HiveEndPoint ep, String data) throws Exception {
            super("Writer_" + data);
            this.writer = new DelimitedInputWriter(fieldNames, ",", ep);
            this.conn = ep.newConnection(false, "UT_" + Thread.currentThread().getName());
            this.data = data;
            this.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                @Override
                public void uncaughtException(Thread thread, Throwable throwable) {
                    error = throwable;
                    LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable);
                }
            });
        }

        @Override
        public void run() {
            TransactionBatch txnBatch = null;
            try {
                txnBatch = this.conn.fetchTransactionBatch(10, (RecordWriter)this.writer);
                while (txnBatch.remainingTransactions() > 0) {
                    txnBatch.beginNextTransaction();
                    txnBatch.write(this.data.getBytes());
                    txnBatch.write(this.data.getBytes());
                    txnBatch.commit();
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            finally {
                if (txnBatch != null) {
                    try {
                        txnBatch.close();
                    }
                    catch (Exception e) {
                        LOG.error("txnBatch.close() failed: " + e.getMessage(), (Throwable)e);
                        this.conn.close();
                    }
                }
                try {
                    this.conn.close();
                }
                catch (Exception e) {
                    LOG.error("conn.close() failed: " + e.getMessage(), (Throwable)e);
                }
            }
        }
    }

    public static class RawFileSystem
    extends RawLocalFileSystem {
        private static final URI NAME;

        public URI getUri() {
            return NAME;
        }

        public FileStatus getFileStatus(Path path) throws IOException {
            File file = this.pathToFile(path);
            if (!file.exists()) {
                throw new FileNotFoundException("Can't find " + path);
            }
            short mod = 0;
            if (file.canRead()) {
                mod = (short)(mod | 0x124);
            }
            if (file.canWrite()) {
                mod = (short)(mod | 0x80);
            }
            if (file.canExecute()) {
                mod = (short)(mod | 0x49);
            }
            return new FileStatus(file.length(), file.isDirectory(), 1, 1024L, file.lastModified(), file.lastModified(), FsPermission.createImmutable((short)mod), "owen", "users", path);
        }

        static {
            try {
                NAME = new URI("raw:///");
            }
            catch (URISyntaxException se) {
                throw new IllegalArgumentException("bad uri", se);
            }
        }
    }
}

