/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.sink.hbase;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.primitives.UnsignedBytes;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.stumbleupon.async.Callback;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer;
import org.apache.flume.sink.hbase.HBaseSinkConfigurationConstants;
import org.apache.flume.sink.hbase.HBaseVersionCheck;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.Config;
import org.hbase.async.HBaseClient;
import org.hbase.async.PutRequest;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncHBaseSink
extends AbstractSink
implements Configurable,
BatchSizeSupported {
    private String tableName;
    private byte[] columnFamily;
    private long batchSize;
    private static final Logger logger = LoggerFactory.getLogger(AsyncHBaseSink.class);
    private AsyncHbaseEventSerializer serializer;
    @VisibleForTesting
    Config asyncClientConfig;
    private String eventSerializerType;
    private Context serializerContext;
    private HBaseClient client;
    private Configuration conf;
    private Transaction txn;
    private volatile boolean open = false;
    private SinkCounter sinkCounter;
    private long timeout;
    private String zkQuorum;
    private String zkBaseDir;
    private ExecutorService sinkCallbackPool;
    private boolean isTimeoutTest;
    private boolean isCoalesceTest;
    private boolean enableWal = true;
    private boolean batchIncrements = false;
    private volatile int totalCallbacksReceived = 0;
    private int maxConsecutiveFails;
    private Map<CellIdentifier, AtomicIncrementRequest> incrementBuffer;
    private int consecutiveHBaseFailures = 0;
    private boolean lastTxnFailed = false;
    private final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();

    public AsyncHBaseSink() {
        this(null);
    }

    public AsyncHBaseSink(Configuration conf) {
        this(conf, false, false);
    }

    @VisibleForTesting
    AsyncHBaseSink(Configuration conf, boolean isTimeoutTest, boolean isCoalesceTest) {
        this.conf = conf;
        this.isTimeoutTest = isTimeoutTest;
        this.isCoalesceTest = isCoalesceTest;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Sink.Status process() throws EventDeliveryException {
        int i;
        Sink.Status status;
        Condition condition;
        ReentrantLock lock;
        AtomicInteger callbacksExpected;
        AtomicInteger callbacksReceived;
        AtomicBoolean txnFail;
        block30: {
            if (!this.open) {
                throw new EventDeliveryException("Sink was never opened. Please fix the configuration.");
            }
            if (this.client == null) {
                this.client = this.initHBaseClient();
                if (this.client == null) {
                    throw new EventDeliveryException("Could not establish connection to HBase!");
                }
            }
            txnFail = new AtomicBoolean(false);
            callbacksReceived = new AtomicInteger(0);
            callbacksExpected = new AtomicInteger(0);
            lock = new ReentrantLock();
            condition = lock.newCondition();
            if (this.incrementBuffer != null) {
                this.incrementBuffer.clear();
            }
            SuccessCallback putSuccessCallback = new SuccessCallback(lock, callbacksReceived, condition);
            FailureCallback putFailureCallback = new FailureCallback(lock, callbacksReceived, txnFail, condition);
            SuccessCallback incrementSuccessCallback = new SuccessCallback(lock, callbacksReceived, condition);
            FailureCallback incrementFailureCallback = new FailureCallback(lock, callbacksReceived, txnFail, condition);
            status = Sink.Status.READY;
            Channel channel = this.getChannel();
            this.txn = channel.getTransaction();
            this.txn.begin();
            i = 0;
            try {
                while ((long)i < this.batchSize) {
                    Event event = channel.take();
                    if (event == null) {
                        status = Sink.Status.BACKOFF;
                        if (i == 0) {
                            this.sinkCounter.incrementBatchEmptyCount();
                        } else {
                            this.sinkCounter.incrementBatchUnderflowCount();
                        }
                        break;
                    }
                    this.serializer.setEvent(event);
                    List<PutRequest> actions = this.serializer.getActions();
                    List<AtomicIncrementRequest> increments = this.serializer.getIncrements();
                    callbacksExpected.addAndGet(actions.size());
                    if (!this.batchIncrements) {
                        callbacksExpected.addAndGet(increments.size());
                    }
                    for (PutRequest action : actions) {
                        action.setDurable(this.enableWal);
                        this.client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
                    }
                    for (AtomicIncrementRequest increment : increments) {
                        if (this.batchIncrements) {
                            CellIdentifier identifier = new CellIdentifier(increment.key(), increment.qualifier());
                            AtomicIncrementRequest request = this.incrementBuffer.get(identifier);
                            if (request == null) {
                                this.incrementBuffer.put(identifier, increment);
                                continue;
                            }
                            request.setAmount(request.getAmount() + increment.getAmount());
                            continue;
                        }
                        this.client.atomicIncrement(increment).addCallbacks(incrementSuccessCallback, incrementFailureCallback);
                    }
                    ++i;
                }
            }
            catch (Throwable e) {
                this.handleTransactionFailure(this.txn);
                this.checkIfChannelExceptionAndThrow(e);
                break block30;
            }
            if (this.batchIncrements) {
                Collection<AtomicIncrementRequest> increments = this.incrementBuffer.values();
                for (AtomicIncrementRequest increment : increments) {
                    this.client.atomicIncrement(increment).addCallbacks(incrementSuccessCallback, incrementFailureCallback);
                }
                callbacksExpected.addAndGet(increments.size());
            }
            this.client.flush();
        }
        if ((long)i == this.batchSize) {
            this.sinkCounter.incrementBatchCompleteCount();
        }
        this.sinkCounter.addToEventDrainAttemptCount((long)i);
        lock.lock();
        long startTime = System.nanoTime();
        try {
            while (callbacksReceived.get() < callbacksExpected.get() && !txnFail.get()) {
                long timeRemaining = this.timeout - (System.nanoTime() - startTime);
                timeRemaining = timeRemaining >= 0L ? timeRemaining : 0L;
                try {
                    if (condition.await(timeRemaining, TimeUnit.NANOSECONDS)) continue;
                    txnFail.set(true);
                    logger.warn("HBase callbacks timed out. Transaction will be rolled back.");
                }
                catch (Exception ex) {
                    logger.error("Exception while waiting for callbacks from HBase.");
                    this.handleTransactionFailure(this.txn);
                    Throwables.propagate((Throwable)ex);
                }
            }
        }
        finally {
            lock.unlock();
        }
        if (this.isCoalesceTest) {
            this.totalCallbacksReceived += callbacksReceived.get();
        }
        if (txnFail.get()) {
            if (this.lastTxnFailed) {
                ++this.consecutiveHBaseFailures;
            }
            this.lastTxnFailed = true;
            this.handleTransactionFailure(this.txn);
            throw new EventDeliveryException("Could not write events to Hbase. Transaction failed, and rolled back.");
        }
        try {
            this.lastTxnFailed = false;
            this.consecutiveHBaseFailures = 0;
            this.txn.commit();
            this.txn.close();
            this.sinkCounter.addToEventDrainSuccessCount((long)i);
            return status;
        }
        catch (Throwable e) {
            this.handleTransactionFailure(this.txn);
            this.checkIfChannelExceptionAndThrow(e);
        }
        return status;
    }

    public void configure(Context context) {
        if (!HBaseVersionCheck.hasVersionLessThan2(logger)) {
            throw new ConfigurationException("HBase major version number must be less than 2 for asynchbase sink. ");
        }
        this.tableName = context.getString("table");
        String cf = context.getString("columnFamily");
        this.batchSize = context.getLong("batchSize", new Long(100L));
        this.serializerContext = new Context();
        this.eventSerializerType = context.getString("serializer");
        Preconditions.checkNotNull((Object)this.tableName, (Object)"Table name cannot be empty, please specify in configuration file");
        Preconditions.checkNotNull((Object)cf, (Object)"Column family cannot be empty, please specify in configuration file");
        if (this.eventSerializerType == null || this.eventSerializerType.isEmpty()) {
            this.eventSerializerType = "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer";
            logger.info("No serializer defined, Will use default");
        }
        this.serializerContext.putAll((Map)context.getSubProperties("serializer."));
        this.columnFamily = cf.getBytes(Charsets.UTF_8);
        try {
            Class<?> clazz = Class.forName(this.eventSerializerType);
            this.serializer = (AsyncHbaseEventSerializer)clazz.newInstance();
            this.serializer.configure(this.serializerContext);
            this.serializer.initialize(this.tableName.getBytes(Charsets.UTF_8), this.columnFamily);
        }
        catch (Exception e) {
            logger.error("Could not instantiate event serializer.", (Throwable)e);
            Throwables.propagate((Throwable)e);
        }
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(this.getName());
        }
        this.timeout = context.getLong("timeout", Long.valueOf(60000L));
        if (this.timeout <= 0L) {
            logger.warn("Timeout should be positive for Hbase sink. Sink will not timeout.");
            this.timeout = 60000L;
        }
        this.timeout = TimeUnit.MILLISECONDS.toNanos(this.timeout);
        this.zkQuorum = context.getString("zookeeperQuorum", "").trim();
        if (!this.zkQuorum.isEmpty()) {
            this.zkBaseDir = context.getString("znodeParent", "/hbase");
        } else {
            if (this.conf == null) {
                this.conf = HBaseConfiguration.create();
            }
            this.zkQuorum = ZKConfig.getZKQuorumServersString((Configuration)this.conf);
            this.zkBaseDir = this.conf.get("zookeeper.znode.parent", "/hbase");
        }
        Preconditions.checkState((this.zkQuorum != null && !this.zkQuorum.isEmpty() ? 1 : 0) != 0, (Object)"The Zookeeper quorum cannot be null and should be specified.");
        this.enableWal = context.getBoolean("enableWal", Boolean.valueOf(true));
        logger.info("The write to WAL option is set to: " + String.valueOf(this.enableWal));
        if (!this.enableWal) {
            logger.warn("AsyncHBaseSink's enableWal configuration is set to false. All writes to HBase will have WAL disabled, and any data in the memstore of this region in the Region Server could be lost!");
        }
        this.batchIncrements = context.getBoolean("coalesceIncrements", HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
        if (this.batchIncrements) {
            this.incrementBuffer = Maps.newHashMap();
            logger.info("Increment coalescing is enabled. Increments will be buffered.");
        }
        this.maxConsecutiveFails = context.getInteger("maxConsecutiveFails", Integer.valueOf(10));
        ImmutableMap asyncProperties = context.getSubProperties("async.");
        this.asyncClientConfig = new Config();
        this.asyncClientConfig.overrideConfig("hbase.zookeeper.quorum", this.zkQuorum);
        this.asyncClientConfig.overrideConfig("hbase.zookeeper.znode.parent", this.zkBaseDir);
        for (String property : asyncProperties.keySet()) {
            this.asyncClientConfig.overrideConfig(property, (String)asyncProperties.get(property));
        }
    }

    @VisibleForTesting
    int getTotalCallbacksReceived() {
        return this.totalCallbacksReceived;
    }

    @VisibleForTesting
    boolean isConfNull() {
        return this.conf == null;
    }

    public long getBatchSize() {
        return this.batchSize;
    }

    public void start() {
        Preconditions.checkArgument((this.client == null ? 1 : 0) != 0, (Object)"Please call stop before calling start on an old instance.");
        this.sinkCounter.start();
        this.sinkCounter.incrementConnectionCreatedCount();
        this.client = this.initHBaseClient();
        super.start();
    }

    private HBaseClient initHBaseClient() {
        logger.info("Initializing HBase Client");
        this.sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(this.getName() + " HBase Call Pool").build());
        logger.info("Callback pool created");
        this.client = new HBaseClient(this.asyncClientConfig, (ClientSocketChannelFactory)new NioClientSocketChannelFactory((Executor)this.sinkCallbackPool, (Executor)this.sinkCallbackPool));
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicBoolean fail = new AtomicBoolean(false);
        this.client.ensureTableFamilyExists(this.tableName.getBytes(Charsets.UTF_8), this.columnFamily).addCallbacks((Callback)new Callback<Object, Object>(){

            public Object call(Object arg) throws Exception {
                latch.countDown();
                logger.info("table found");
                return null;
            }
        }, (Callback)new Callback<Object, Object>(){

            public Object call(Object arg) throws Exception {
                fail.set(true);
                latch.countDown();
                return null;
            }
        });
        try {
            logger.info("waiting on callback");
            latch.await();
            logger.info("callback received");
        }
        catch (InterruptedException e) {
            this.sinkCounter.incrementConnectionFailedCount();
            throw new FlumeException("Interrupted while waiting for Hbase Callbacks", (Throwable)e);
        }
        if (fail.get()) {
            this.sinkCounter.incrementConnectionFailedCount();
            if (this.client != null) {
                this.shutdownHBaseClient();
            }
            throw new FlumeException("Could not start sink. Table or column family does not exist in Hbase.");
        }
        this.open = true;
        this.client.setFlushInterval((short)0);
        return this.client;
    }

    public void stop() {
        block5: {
            this.serializer.cleanUp();
            if (this.client != null) {
                this.shutdownHBaseClient();
            }
            this.sinkCounter.incrementConnectionClosedCount();
            this.sinkCounter.stop();
            try {
                if (this.sinkCallbackPool != null) {
                    this.sinkCallbackPool.shutdown();
                    if (!this.sinkCallbackPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                        this.sinkCallbackPool.shutdownNow();
                    }
                }
            }
            catch (InterruptedException e) {
                logger.error("Interrupted while waiting for asynchbase sink pool to die", (Throwable)e);
                if (this.sinkCallbackPool == null) break block5;
                this.sinkCallbackPool.shutdownNow();
            }
        }
        this.sinkCallbackPool = null;
        this.client = null;
        this.conf = null;
        this.open = false;
        super.stop();
    }

    private void shutdownHBaseClient() {
        logger.info("Shutting down HBase Client");
        final CountDownLatch waiter = new CountDownLatch(1);
        try {
            this.client.shutdown().addCallback((Callback)new Callback<Object, Object>(){

                public Object call(Object arg) throws Exception {
                    waiter.countDown();
                    return null;
                }
            }).addErrback((Callback)new Callback<Object, Object>(){

                public Object call(Object arg) throws Exception {
                    logger.error("Failed to shutdown HBase client cleanly! HBase cluster might be down");
                    waiter.countDown();
                    return null;
                }
            });
            if (!waiter.await(this.timeout, TimeUnit.NANOSECONDS)) {
                logger.error("HBase connection could not be closed within timeout! HBase cluster might be down!");
            }
        }
        catch (Exception ex) {
            logger.warn("Error while attempting to close connections to HBase");
        }
        finally {
            this.client = null;
        }
    }

    private void handleTransactionFailure(Transaction txn) throws EventDeliveryException {
        block8: {
            if (this.maxConsecutiveFails > 0 && this.consecutiveHBaseFailures >= this.maxConsecutiveFails) {
                if (this.client != null) {
                    this.shutdownHBaseClient();
                }
                this.consecutiveHBaseFailures = 0;
            }
            try {
                txn.rollback();
            }
            catch (Throwable e) {
                logger.error("Failed to commit transaction.Transaction rolled back.", e);
                if (e instanceof Error || e instanceof RuntimeException) {
                    logger.error("Failed to commit transaction.Transaction rolled back.", e);
                    Throwables.propagate((Throwable)e);
                    break block8;
                }
                logger.error("Failed to commit transaction.Transaction rolled back.", e);
                throw new EventDeliveryException("Failed to commit transaction.Transaction rolled back.", e);
            }
            finally {
                txn.close();
            }
        }
    }

    private void checkIfChannelExceptionAndThrow(Throwable e) throws EventDeliveryException {
        if (e instanceof ChannelException) {
            throw new EventDeliveryException("Error in processing transaction.", e);
        }
        if (e instanceof Error || e instanceof RuntimeException) {
            Throwables.propagate((Throwable)e);
        }
        throw new EventDeliveryException("Error in processing transaction.", e);
    }

    private class CellIdentifier {
        private final byte[] row;
        private final byte[] column;
        private final int hashCode;

        public CellIdentifier(byte[] row, byte[] column) {
            this.row = row;
            this.column = column;
            this.hashCode = Arrays.hashCode(row) * 31 * (Arrays.hashCode(column) * 31);
        }

        public int hashCode() {
            return this.hashCode;
        }

        public boolean equals(Object other) {
            CellIdentifier o = (CellIdentifier)other;
            if (other == null) {
                return false;
            }
            return AsyncHBaseSink.this.COMPARATOR.compare(this.row, o.row) == 0 && AsyncHBaseSink.this.COMPARATOR.compare(this.column, o.column) == 0;
        }
    }

    private class FailureCallback<R, T extends Exception>
    implements Callback<R, T> {
        private Lock lock;
        private AtomicInteger callbacksReceived;
        private AtomicBoolean txnFail;
        private Condition condition;
        private final boolean isTimeoutTesting;

        public FailureCallback(Lock lck, AtomicInteger callbacksReceived, AtomicBoolean txnFail, Condition condition) {
            this.lock = lck;
            this.callbacksReceived = callbacksReceived;
            this.txnFail = txnFail;
            this.condition = condition;
            this.isTimeoutTesting = AsyncHBaseSink.this.isTimeoutTest;
        }

        public R call(T arg) throws Exception {
            logger.error("failure callback:", arg);
            if (this.isTimeoutTesting) {
                try {
                    TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4L));
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            this.doCall();
            return null;
        }

        private void doCall() throws Exception {
            this.callbacksReceived.incrementAndGet();
            this.txnFail.set(true);
            this.lock.lock();
            try {
                this.condition.signal();
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    private class SuccessCallback<R, T>
    implements Callback<R, T> {
        private Lock lock;
        private AtomicInteger callbacksReceived;
        private Condition condition;
        private final boolean isTimeoutTesting;

        public SuccessCallback(Lock lck, AtomicInteger callbacksReceived, Condition condition) {
            this.lock = lck;
            this.callbacksReceived = callbacksReceived;
            this.condition = condition;
            this.isTimeoutTesting = AsyncHBaseSink.this.isTimeoutTest;
        }

        public R call(T arg) throws Exception {
            if (this.isTimeoutTesting) {
                try {
                    TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4L));
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            this.doCall();
            return null;
        }

        private void doCall() throws Exception {
            this.callbacksReceived.incrementAndGet();
            this.lock.lock();
            try {
                this.condition.signal();
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

