package org.apache.phoenix.flume;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.flume.serializer.EventSerializers;
import org.apache.phoenix.flume.sink.PhoenixSink;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({NeedsOwnMiniClusterTest.class})
/* loaded from: input_file:org/apache/phoenix/flume/JsonEventSerializerIT.class */
public class JsonEventSerializerIT extends BaseTest {
    private Context sinkContext;
    private PhoenixSink sink;

    @BeforeClass
    public static synchronized void doSetup() throws Exception {
        setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
    }

    @AfterClass
    public static synchronized void doTeardown() throws Exception {
        dropNonSystemTables();
    }

    @After
    public void cleanUpAfterTest() throws Exception {
        deletePriorMetaData(Long.MAX_VALUE, getUrl());
    }

    @Test
    public void testWithOutColumnsMapping() throws EventDeliveryException, SQLException {
        initSinkContext("FLUME_JSON_TEST", "CREATE TABLE IF NOT EXISTS FLUME_JSON_TEST  (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]  CONSTRAINT pk PRIMARY KEY (flume_time))\n", "col1,col2,col3,col4", null, DefaultKeyGenerator.TIMESTAMP.name(), null);
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        Assert.assertEquals(LifecycleState.IDLE, this.sink.getLifecycleState());
        Channel initChannel = initChannel();
        this.sink.setChannel(initChannel);
        this.sink.start();
        Event withBody = EventBuilder.withBody(Bytes.toBytes("{\"col1\" : \"kalyan\", \"col2\" : 10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}"));
        Transaction transaction = initChannel.getTransaction();
        transaction.begin();
        initChannel.put(withBody);
        transaction.commit();
        transaction.close();
        this.sink.process();
        Assert.assertEquals(1L, countRows("FLUME_JSON_TEST"));
        this.sink.stop();
        Assert.assertEquals(LifecycleState.STOP, this.sink.getLifecycleState());
        dropTable("FLUME_JSON_TEST");
    }

    @Test
    public void testDifferentColumnNames() throws EventDeliveryException, SQLException {
        initSinkContext("FLUME_JSON_TEST", "CREATE TABLE IF NOT EXISTS FLUME_JSON_TEST  (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]  CONSTRAINT pk PRIMARY KEY (flume_time))\n", "col1,col2,col3,col4", "{\"col1\":\"col1\",\"col2\":\"f2\",\"col3\":\"f3\",\"col4\":\"col4\"}", DefaultKeyGenerator.TIMESTAMP.name(), null);
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        Assert.assertEquals(LifecycleState.IDLE, this.sink.getLifecycleState());
        Channel initChannel = initChannel();
        this.sink.setChannel(initChannel);
        this.sink.start();
        Event withBody = EventBuilder.withBody(Bytes.toBytes("{\"col1\" : \"kalyan\", \"f2\" : 10.5, \"f3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}"));
        Transaction transaction = initChannel.getTransaction();
        transaction.begin();
        initChannel.put(withBody);
        transaction.commit();
        transaction.close();
        this.sink.process();
        Assert.assertEquals(1L, countRows("FLUME_JSON_TEST"));
        this.sink.stop();
        Assert.assertEquals(LifecycleState.STOP, this.sink.getLifecycleState());
        dropTable("FLUME_JSON_TEST");
    }

    @Test
    public void testInnerColumns() throws EventDeliveryException, SQLException {
        initSinkContext("FLUME_JSON_TEST", "CREATE TABLE IF NOT EXISTS FLUME_JSON_TEST  (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]  CONSTRAINT pk PRIMARY KEY (flume_time))\n", "col1,col2,col3,col4", "{\"col1\":\"col1\",\"col2\":\"x.y\",\"col3\":\"a.b1.c\",\"col4\":\"col4\"}", DefaultKeyGenerator.TIMESTAMP.name(), null);
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        Assert.assertEquals(LifecycleState.IDLE, this.sink.getLifecycleState());
        Channel initChannel = initChannel();
        this.sink.setChannel(initChannel);
        this.sink.start();
        Event withBody = EventBuilder.withBody(Bytes.toBytes("{\"col1\" : \"kalyan\", \"x\" : {\"y\" : 10.5}, \"a\" : {\"b1\" : {\"c\" : [\"abc\",\"pqr\",\"xyz\"] }, \"b2\" : 111}, \"col4\" : [1,2,3,4]}"));
        Transaction transaction = initChannel.getTransaction();
        transaction.begin();
        initChannel.put(withBody);
        transaction.commit();
        transaction.close();
        this.sink.process();
        Assert.assertEquals(1L, countRows("FLUME_JSON_TEST"));
        this.sink.stop();
        Assert.assertEquals(LifecycleState.STOP, this.sink.getLifecycleState());
        dropTable("FLUME_JSON_TEST");
    }

    @Test
    public void testInnerColumnsWithArrayMapping() throws EventDeliveryException, SQLException {
        initSinkContext("FLUME_JSON_TEST", "CREATE TABLE IF NOT EXISTS FLUME_JSON_TEST  (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]  CONSTRAINT pk PRIMARY KEY (flume_time))\n", "col1,col2,col3,col4", "{\"col1\":\"col1\",\"col2\":\"x.y\",\"col3\":\"a.b[*].c\",\"col4\":\"col4\"}", DefaultKeyGenerator.TIMESTAMP.name(), null);
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        Assert.assertEquals(LifecycleState.IDLE, this.sink.getLifecycleState());
        Channel initChannel = initChannel();
        this.sink.setChannel(initChannel);
        this.sink.start();
        Event withBody = EventBuilder.withBody(Bytes.toBytes("{\"col1\" : \"kalyan\", \"x\" : {\"y\" : 10.5}, \"a\" : {\"b\" : [{\"c\" : \"abc\"}, {\"c\" : \"pqr\"}, {\"c\" : \"xyz\"}] , \"b2\" : 111}, \"col4\" : [1,2,3,4]}"));
        Transaction transaction = initChannel.getTransaction();
        transaction.begin();
        initChannel.put(withBody);
        transaction.commit();
        transaction.close();
        this.sink.process();
        Assert.assertEquals(1L, countRows("FLUME_JSON_TEST"));
        this.sink.stop();
        Assert.assertEquals(LifecycleState.STOP, this.sink.getLifecycleState());
        dropTable("FLUME_JSON_TEST");
    }

    @Test
    public void testKeyGenerator() throws EventDeliveryException, SQLException {
        initSinkContextWithDefaults("FLUME_JSON_TEST");
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        Assert.assertEquals(LifecycleState.IDLE, this.sink.getLifecycleState());
        Channel initChannel = initChannel();
        this.sink.setChannel(initChannel);
        this.sink.start();
        Event withBody = EventBuilder.withBody(Bytes.toBytes("{\"col1\" : \"kalyan\", \"col2\" : 10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}"));
        Transaction transaction = initChannel.getTransaction();
        transaction.begin();
        initChannel.put(withBody);
        transaction.commit();
        transaction.close();
        this.sink.process();
        Assert.assertEquals(1L, countRows("FLUME_JSON_TEST"));
        this.sink.stop();
        Assert.assertEquals(LifecycleState.STOP, this.sink.getLifecycleState());
        dropTable("FLUME_JSON_TEST");
    }

    @Test
    public void testMismatchKeyGenerator() throws EventDeliveryException, SQLException {
        initSinkContextWithDefaults("FLUME_JSON_TEST");
        setConfig("serializer.rowkeyType", DefaultKeyGenerator.UUID.name());
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        Assert.assertEquals(LifecycleState.IDLE, this.sink.getLifecycleState());
        Channel initChannel = initChannel();
        this.sink.setChannel(initChannel);
        this.sink.start();
        Event withBody = EventBuilder.withBody(Bytes.toBytes("{\"col1\" : \"kalyan\", \"col2\" : 10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}"));
        Transaction transaction = initChannel.getTransaction();
        transaction.begin();
        initChannel.put(withBody);
        transaction.commit();
        transaction.close();
        try {
            this.sink.process();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getMessage().contains("java.lang.IllegalArgumentException: Invalid format:"));
        }
        dropTable("FLUME_JSON_TEST");
    }

    @Test
    public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException {
        initSinkContextWithDefaults("FLUME_JSON_TEST");
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        Assert.assertEquals(LifecycleState.IDLE, this.sink.getLifecycleState());
        Channel initChannel = initChannel();
        this.sink.setChannel(initChannel);
        this.sink.start();
        Event withBody = EventBuilder.withBody(Bytes.toBytes("{\"col1\" : \"kalyan\", \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}"));
        Transaction transaction = initChannel.getTransaction();
        transaction.begin();
        initChannel.put(withBody);
        transaction.commit();
        transaction.close();
        this.sink.process();
        Assert.assertEquals(0L, countRows("FLUME_JSON_TEST"));
        this.sink.stop();
        Assert.assertEquals(LifecycleState.STOP, this.sink.getLifecycleState());
        dropTable("FLUME_JSON_TEST");
    }

    @Test
    public void testBatchEvents() throws EventDeliveryException, SQLException {
        initSinkContextWithDefaults("FLUME_JSON_TEST");
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        Assert.assertEquals(LifecycleState.IDLE, this.sink.getLifecycleState());
        Channel initChannel = initChannel();
        this.sink.setChannel(initChannel);
        this.sink.start();
        ArrayList arrayList = new ArrayList(150);
        for (int i = 0; i < arrayList.size(); i++) {
            arrayList.add(EventBuilder.withBody(Bytes.toBytes("{\"col1\" : \"val1" + i + "\", \"col2\" : " + (i * 10.5d) + " , \"col3\" : [aaa,bbb,ccc] , \"col4\" : [1,2,3,4]}")));
        }
        Transaction transaction = initChannel.getTransaction();
        transaction.begin();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            initChannel.put((Event) it.next());
        }
        transaction.commit();
        transaction.close();
        this.sink.process();
        Assert.assertEquals(arrayList.size(), countRows("FLUME_JSON_TEST"));
        this.sink.stop();
        Assert.assertEquals(LifecycleState.STOP, this.sink.getLifecycleState());
        dropTable("FLUME_JSON_TEST");
    }

    @Test
    public void testEventsWithHeaders() throws Exception {
        this.sinkContext = new Context();
        initSinkContext("FLUME_JSON_TEST", "CREATE TABLE IF NOT EXISTS FLUME_JSON_TEST  (rowkey VARCHAR not null, col1 varchar , col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n  CONSTRAINT pk PRIMARY KEY (rowkey))\n", "col1,col2,col3,col4", "{\"col1\":\"col1\",\"col2\":\"col2\",\"col3\":\"col3\",\"col4\":\"col4\"}", DefaultKeyGenerator.UUID.name(), "host,source");
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        Assert.assertEquals(LifecycleState.IDLE, this.sink.getLifecycleState());
        Channel initChannel = initChannel();
        this.sink.setChannel(initChannel);
        this.sink.start();
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            HashMap hashMap = new HashMap(2);
            hashMap.put("host", "host1");
            hashMap.put("source", "source1");
            arrayList.add(EventBuilder.withBody(Bytes.toBytes("{\"col1\" : \"val1" + i + "\", \"col2\" : " + (i * 10.5d) + " , \"col3\" : [aaa,bbb,ccc] , \"col4\" : [1,2,3,4]}"), hashMap));
        }
        Transaction transaction = initChannel.getTransaction();
        transaction.begin();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            initChannel.put((Event) it.next());
        }
        transaction.commit();
        transaction.close();
        this.sink.process();
        Connection connection = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
        try {
            ResultSet executeQuery = connection.createStatement().executeQuery(" SELECT * FROM \n FLUME_JSON_TEST");
            Assert.assertTrue(executeQuery.next());
            Assert.assertEquals("host1", executeQuery.getString("host"));
            Assert.assertEquals("source1", executeQuery.getString("source"));
            Assert.assertTrue(executeQuery.next());
            Assert.assertEquals("host1", executeQuery.getString("host"));
            Assert.assertEquals("source1", executeQuery.getString("source"));
            if (connection != null) {
                connection.close();
            }
            this.sink.stop();
            Assert.assertEquals(LifecycleState.STOP, this.sink.getLifecycleState());
            dropTable("FLUME_JSON_TEST");
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private Channel initChannel() {
        Context context = new Context();
        context.put("capacity", "10000");
        context.put("transactionCapacity", "200");
        MemoryChannel memoryChannel = new MemoryChannel();
        memoryChannel.setName("memorychannel");
        Configurables.configure(memoryChannel, context);
        return memoryChannel;
    }

    private void initSinkContext(String str, String str2, String str3, String str4, String str5, String str6) {
        if (str == null) {
            throw new NullPointerException();
        }
        this.sinkContext = new Context();
        this.sinkContext.put("table", str);
        this.sinkContext.put("jdbcUrl", getUrl());
        this.sinkContext.put("serializer", EventSerializers.JSON.name());
        this.sinkContext.put("ddl", str2);
        this.sinkContext.put("serializer.columns", str3);
        if (null != str4) {
            this.sinkContext.put("serializer.columnsMapping", str4);
        }
        if (null != str5) {
            this.sinkContext.put("serializer.rowkeyType", str5);
        }
        if (null != str6) {
            this.sinkContext.put("serializer.headers", str6);
        }
    }

    private void initSinkContextWithDefaults(String str) {
        initSinkContext(str, "CREATE TABLE IF NOT EXISTS " + str + "  (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]  CONSTRAINT pk PRIMARY KEY (flume_time))\n", "col1,col2,col3,col4", "{\"col1\":\"col1\",\"col2\":\"col2\",\"col3\":\"col3\",\"col4\":\"col4\"}", DefaultKeyGenerator.TIMESTAMP.name(), null);
    }

    private void setConfig(String str, String str2) {
        if (this.sinkContext == null) {
            throw new NullPointerException();
        }
        if (str == null) {
            throw new NullPointerException();
        }
        if (str2 == null) {
            throw new NullPointerException();
        }
        this.sinkContext.put(str, str2);
    }

    private int countRows(String str) throws SQLException {
        if (str == null) {
            throw new NullPointerException();
        }
        Connection connection = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
        ResultSet resultSet = null;
        try {
            resultSet = connection.createStatement().executeQuery("select count(*) from " + str);
            int i = 0;
            while (resultSet.next()) {
                i = resultSet.getInt(1);
            }
            int i2 = i;
            if (resultSet != null) {
                resultSet.close();
            }
            if (connection != null) {
                connection.close();
            }
            return i2;
        } catch (Throwable th) {
            if (resultSet != null) {
                resultSet.close();
            }
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private void dropTable(String str) throws SQLException {
        if (str == null) {
            throw new NullPointerException();
        }
        Connection connection = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
        try {
            connection.createStatement().execute("drop table if exists " + str);
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }
}
