package id.onyx.sep.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import id.onyx.sep.EventListener;
import id.onyx.sep.PayloadExtractor;
import id.onyx.sep.SepEvent;
import id.onyx.sep.util.concurrent.WaitPolicy;
import id.onyx.sep.util.io.Closer;
import id.onyx.sep.util.zookeeper.ZooKeeperItf;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;

/* loaded from: input_file:id/onyx/sep/impl/SepConsumer.class */
public class SepConsumer extends BaseHRegionServer {
    private final String subscriptionId;
    private long subscriptionTimestamp;
    private EventListener listener;
    private final ZooKeeperItf zk;
    private final Configuration hbaseConf;
    private RpcServer rpcServer;
    private ServerName serverName;
    private ZKWatcher zkWatcher;
    private SepMetrics sepMetrics;
    private final PayloadExtractor payloadExtractor;
    private String zkNodePath;
    private List<ThreadPoolExecutor> executors;
    boolean running;
    private Log log;

    public SepConsumer(String str, long j, EventListener eventListener, int i, String str2, ZooKeeperItf zooKeeperItf, Configuration configuration) throws IOException, InterruptedException {
        this(str, j, eventListener, i, str2, zooKeeperItf, configuration, null);
    }

    public SepConsumer(String str, long j, EventListener eventListener, int i, String str2, ZooKeeperItf zooKeeperItf, Configuration configuration, PayloadExtractor payloadExtractor) throws IOException, InterruptedException {
        this.running = false;
        this.log = LogFactory.getLog(getClass());
        Preconditions.checkArgument(i > 0, "Thread count must be > 0");
        this.subscriptionId = SepModelImpl.toInternalSubscriptionName(str);
        this.subscriptionTimestamp = j;
        this.listener = eventListener;
        this.zk = zooKeeperItf;
        this.hbaseConf = configuration;
        this.sepMetrics = new SepMetrics(str);
        this.payloadExtractor = payloadExtractor;
        this.executors = Lists.newArrayListWithCapacity(i);
        InetSocketAddress inetSocketAddress = new InetSocketAddress(str2, 0);
        if (inetSocketAddress.getAddress() == null) {
            throw new IllegalArgumentException("Failed resolve of " + inetSocketAddress);
        }
        this.rpcServer = new NettyRpcServer(this, "regionserver/" + inetSocketAddress.toString(), getServices(), inetSocketAddress, configuration, new FifoRpcScheduler(configuration, configuration.getInt("hbase.regionserver.handler.count", 10)), false);
        this.serverName = ServerName.valueOf(str2, this.rpcServer.getListenerAddress().getPort(), System.currentTimeMillis());
        this.zkWatcher = new ZKWatcher(configuration, this.serverName.toString(), (Abortable) null);
        ZKUtil.loginClient(configuration, "hbase.zookeeper.client.keytab.file", "hbase.zookeeper.client.kerberos.principal", str2);
        User.login(configuration, "hbase.regionserver.keytab.file", "hbase.regionserver.kerberos.principal", str2);
        for (int i2 = 0; i2 < i; i2++) {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue(100));
            threadPoolExecutor.setRejectedExecutionHandler(new WaitPolicy());
            this.executors.add(threadPoolExecutor);
        }
    }

    public void start() throws IOException, InterruptedException, KeeperException {
        this.rpcServer.start();
        this.zkNodePath = this.hbaseConf.get("hbasesep.zookeeper.znode.parent", "/ngdata/sep/hbase-slave") + "/" + this.subscriptionId + "/rs/" + this.serverName.getServerName();
        this.zk.create(this.zkNodePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        this.running = true;
    }

    private List<RpcServer.BlockingServiceAndInterface> getServices() {
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(new RpcServer.BlockingServiceAndInterface(AdminProtos.AdminService.newReflectiveBlockingService(this), AdminProtos.AdminService.BlockingInterface.class));
        return arrayList;
    }

    public void stop() {
        Closer.close((Closeable) this.zkWatcher);
        if (this.running) {
            this.running = false;
            Closer.close(this.rpcServer);
            try {
                this.zk.delete(this.zkNodePath, -1);
            } catch (Exception e) {
                this.log.debug("Exception while removing zookeeper node", e);
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        this.sepMetrics.shutdown();
        Iterator<ThreadPoolExecutor> it = this.executors.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    private void waitOnSepEventCompletion(List<Future<?>> list) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<Future<?>> it = list.iterator();
        while (it.hasNext()) {
            try {
                it.next().get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted in processing events.", e);
            } catch (Exception e2) {
                this.log.warn("Error processing a batch of SEP events, the error will be forwarded to HBase for retry", e2);
                newArrayList.add(e2);
            }
        }
        if (newArrayList.isEmpty()) {
            return;
        }
        this.log.error("Encountered exceptions on " + newArrayList.size() + " batches (out of " + list.size() + " total batches)");
        throw new RuntimeException((Throwable) newArrayList.get(0));
    }

    @Override // id.onyx.sep.impl.BaseHRegionServer
    public AdminProtos.ReplicateWALEntryResponse replicateWALEntry(RpcController rpcController, AdminProtos.ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException {
        byte[] extractPayload;
        try {
            long j = -1;
            SepEventExecutor sepEventExecutor = new SepEventExecutor(this.listener, this.executors, 100, this.sepMetrics);
            List<AdminProtos.WALEntry> entryList = replicateWALEntryRequest.getEntryList();
            CellScanner cellScanner = ((HBaseRpcController) rpcController).cellScanner();
            for (AdminProtos.WALEntry wALEntry : entryList) {
                TableName valueOf = wALEntry.getKey().getWriteTime() < this.subscriptionTimestamp ? null : TableName.valueOf(wALEntry.getKey().getTableName().toByteArray());
                ArrayListMultimap create = ArrayListMultimap.create();
                HashMap newHashMap = Maps.newHashMap();
                int associatedCellCount = wALEntry.getAssociatedCellCount();
                for (int i = 0; i < associatedCellCount; i++) {
                    if (!cellScanner.advance()) {
                        throw new ArrayIndexOutOfBoundsException("Expected=" + associatedCellCount + ", index=" + i);
                    }
                    if (valueOf != null) {
                        Cell current = cellScanner.current();
                        ByteBuffer wrap = ByteBuffer.wrap(current.getRowArray(), current.getRowOffset(), current.getRowLength());
                        KeyValue ensureKeyValue = KeyValueUtil.ensureKeyValue(current);
                        if (this.payloadExtractor != null && (extractPayload = this.payloadExtractor.extractPayload(valueOf.toBytes(), ensureKeyValue)) != null) {
                            if (newHashMap.containsKey(wrap)) {
                                this.log.error("Multiple payloads encountered for row " + Bytes.toStringBinary(wrap) + ", choosing " + Bytes.toStringBinary((byte[]) newHashMap.get(wrap)));
                            } else {
                                newHashMap.put(wrap, extractPayload);
                            }
                        }
                        create.put(wrap, ensureKeyValue);
                    }
                }
                for (ByteBuffer byteBuffer : create.keySet()) {
                    List list = (List) create.get(byteBuffer);
                    sepEventExecutor.scheduleSepEvent(new SepEvent(valueOf.toBytes(), CellUtil.cloneRow((Cell) list.get(0)), list, (byte[]) newHashMap.get(byteBuffer)));
                    j = Math.max(j, wALEntry.getKey().getWriteTime());
                }
            }
            waitOnSepEventCompletion(sepEventExecutor.flush());
            if (j > 0) {
                this.sepMetrics.reportSepTimestamp(j);
            }
            return AdminProtos.ReplicateWALEntryResponse.newBuilder().build();
        } catch (IOException e) {
            throw new ServiceException(e);
        }
    }

    @Override // id.onyx.sep.impl.BaseHRegionServer
    public Configuration getConfiguration() {
        return this.hbaseConf;
    }

    @Override // id.onyx.sep.impl.BaseHRegionServer
    public ServerName getServerName() {
        return this.serverName;
    }

    @Override // id.onyx.sep.impl.BaseHRegionServer
    public ZKWatcher getZooKeeper() {
        return this.zkWatcher;
    }
}
