/*
 * Decompiled with CFR 0.152.
 */
package org.apache.omid.tso.client;

import com.google.protobuf.MessageLite;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.tls.X509Exception;
import org.apache.omid.tls.X509Util;
import org.apache.omid.tso.client.AbortException;
import org.apache.omid.tso.client.CellId;
import org.apache.omid.tso.client.ClosingException;
import org.apache.omid.tso.client.ConnectionException;
import org.apache.omid.tso.client.ForwardingTSOFuture;
import org.apache.omid.tso.client.HandshakeFailedException;
import org.apache.omid.tso.client.OmidClientConfiguration;
import org.apache.omid.tso.client.ServiceUnavailableException;
import org.apache.omid.tso.client.TSOFuture;
import org.apache.omid.tso.client.TSOProtocol;
import org.apache.omid.zk.ZKUtils;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
import org.apache.phoenix.thirdparty.com.google.common.net.HostAndPort;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.AbstractFuture;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.statemachine.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TSOClient
implements TSOProtocol,
NodeCacheListener {
    private static final Logger LOG = LoggerFactory.getLogger(TSOClient.class);
    public static final String DEFAULT_ZK_CLUSTER = "localhost:2181";
    private static final long DEFAULT_EPOCH = -1L;
    private volatile long epoch = -1L;
    private CuratorFramework zkClient;
    private NodeCache currentTSOZNode;
    private Bootstrap bootstrap;
    private Channel currentChannel;
    private final ScheduledExecutorService fsmExecutor;
    StateMachine.Fsm fsm;
    private final int requestTimeoutInMs;
    private final int requestMaxRetries;
    private final int tsoReconnectionDelayInSecs;
    private InetSocketAddress tsoAddr;
    private String zkCurrentTsoPath;
    private boolean lowLatency;
    private OmidClientConfiguration.ConflictDetectionLevel conflictDetectionLevel;
    private final AtomicReference<SslContext> sslContextForClient = new AtomicReference();

    public static TSOClient newInstance(OmidClientConfiguration tsoClientConf) throws IOException {
        return new TSOClient(tsoClientConf);
    }

    private TSOClient(final OmidClientConfiguration omidConf) throws IOException {
        HostAndPort hp;
        this.requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
        this.requestMaxRetries = omidConf.getRequestMaxRetries();
        this.tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();
        LOG.info("Connecting to TSO...");
        switch (omidConf.getConnectionType()) {
            case HA: {
                this.zkClient = ZKUtils.initZKClient((String)omidConf.getConnectionString(), (String)omidConf.getZkNamespace(), (int)omidConf.getZkConnectionTimeoutInSecs());
                this.zkCurrentTsoPath = omidConf.getZkCurrentTsoPath();
                this.configureCurrentTSOServerZNodeCache(this.zkCurrentTsoPath);
                String tsoInfo = this.getCurrentTSOInfoFoundInZK(this.zkCurrentTsoPath);
                String[] currentTSOAndEpochArray = tsoInfo.split("#");
                hp = HostAndPort.fromString((String)currentTSOAndEpochArray[0]);
                this.setTSOAddress(hp.getHost(), hp.getPort());
                this.epoch = Long.parseLong(currentTSOAndEpochArray[1]);
                LOG.info("\t* Current TSO host:port found in ZK: {} Epoch {}", (Object)hp, (Object)this.getEpoch());
                break;
            }
            default: {
                hp = HostAndPort.fromString((String)omidConf.getConnectionString());
                this.setTSOAddress(hp.getHost(), hp.getPort());
                LOG.info("\t* TSO host:port {} will be connected directly", (Object)hp);
            }
        }
        this.fsmExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("tsofsm-%d").build());
        this.fsm = new StateMachine.FsmImpl(this.fsmExecutor);
        this.fsm.setInitState((StateMachine.State)new DisconnectedState(this.fsm));
        int tsoExecutorThreads = omidConf.getExecutorThreads();
        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(tsoExecutorThreads, workerThreadFactory);
        this.bootstrap = new Bootstrap();
        this.bootstrap.group((EventLoopGroup)workerGroup);
        this.bootstrap.channel(NioSocketChannel.class);
        this.bootstrap.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();
                if (omidConf.getTlsEnabled()) {
                    SslContext sslContext = TSOClient.this.getSslContext(omidConf);
                    SslHandler sslHandler = sslContext.newHandler(channel.alloc(), hp.getHost(), hp.getPort());
                    sslHandler.setHandshakeTimeoutMillis((long)omidConf.getClientNettyTlsHandshakeTimeout());
                    channel.pipeline().addFirst(new ChannelHandler[]{sslHandler});
                    LOG.info("SSL handler added with handshake timeout {} ms", (Object)sslHandler.getHandshakeTimeoutMillis());
                }
                pipeline.addLast("lengthbaseddecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 4));
                pipeline.addLast("lengthprepender", (ChannelHandler)new LengthFieldPrepender(4));
                pipeline.addLast("protobufdecoder", (ChannelHandler)new ProtobufDecoder((MessageLite)TSOProto.Response.getDefaultInstance()));
                pipeline.addLast("protobufencoder", (ChannelHandler)new ProtobufEncoder());
                pipeline.addLast("inboundHandler", (ChannelHandler)new Handler(TSOClient.this.fsm));
            }
        });
        this.bootstrap.option(ChannelOption.TCP_NODELAY, (Object)true);
        this.bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        this.bootstrap.option(ChannelOption.SO_REUSEADDR, (Object)true);
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)100);
        this.lowLatency = false;
        this.conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
    }

    @VisibleForTesting
    SslContext getSslContext(OmidClientConfiguration omidConf) throws X509Exception, IOException {
        String tlsConfigProtocols;
        String cipherSuites;
        String enabledProtocols;
        boolean sslOcspEnabled;
        boolean sslCrlEnabled;
        String truststoreType;
        char[] truststorePassword;
        String trustStoreLocation;
        String keyStoreType;
        char[] keyStorePassword;
        String keyStoreLocation;
        SslContext result = this.sslContextForClient.get();
        if (result == null && !this.sslContextForClient.compareAndSet(null, result = X509Util.createSslContextForClient((String)(keyStoreLocation = omidConf.getKeyStoreLocation()), (char[])(keyStorePassword = omidConf.getKeyStorePassword().toCharArray()), (String)(keyStoreType = omidConf.getKeyStoreType()), (String)(trustStoreLocation = omidConf.getTrustStoreLocation()), (char[])(truststorePassword = omidConf.getTrustStorePassword().toCharArray()), (String)(truststoreType = omidConf.getTrustStoreType()), (boolean)(sslCrlEnabled = omidConf.getSslCrlEnabled()), (boolean)(sslOcspEnabled = omidConf.getSslOcspEnabled()), (String)(enabledProtocols = omidConf.getEnabledProtocols()), (String)(cipherSuites = omidConf.getCipherSuites()), (String)(tlsConfigProtocols = omidConf.getTsConfigProtocols())))) {
            result = this.sslContextForClient.get();
        }
        return result;
    }

    @Override
    public TSOFuture<Long> getNewStartTimestamp() {
        TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
        TSOProto.TimestampRequest.Builder tsreqBuilder = TSOProto.TimestampRequest.newBuilder();
        builder.setTimestampRequest(tsreqBuilder.build());
        RequestEvent request = new RequestEvent(builder.build(), this.requestMaxRetries);
        this.fsm.sendEvent((StateMachine.Event)request);
        return new ForwardingTSOFuture<Long>((ListenableFuture<Long>)request);
    }

    @Override
    public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
        return this.commit(transactionId, cells, new HashSet());
    }

    @Override
    public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells, Set<? extends CellId> conflictFreeWriteSet) {
        TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
        TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
        commitbuilder.setStartTimestamp(transactionId);
        HashSet<Long> rowLevelWriteSet = new HashSet<Long>();
        HashSet<Long> tableIDs = new HashSet<Long>();
        rowLevelWriteSet.clear();
        block4: for (CellId cellId : cells) {
            long id;
            switch (this.conflictDetectionLevel) {
                case ROW: {
                    id = cellId.getRowId();
                    if (rowLevelWriteSet.contains(id)) continue block4;
                    rowLevelWriteSet.add(id);
                    break;
                }
                case CELL: {
                    id = cellId.getCellId();
                    break;
                }
                default: {
                    id = 0L;
                    assert (false);
                    break;
                }
            }
            commitbuilder.addCellId(id);
            tableIDs.add(cellId.getTableId());
        }
        for (CellId cellId : conflictFreeWriteSet) {
            tableIDs.add(cellId.getTableId());
        }
        commitbuilder.addAllTableId(tableIDs);
        tableIDs.clear();
        builder.setCommitRequest(commitbuilder.build());
        RequestEvent request = new RequestEvent(builder.build(), this.requestMaxRetries);
        this.fsm.sendEvent((StateMachine.Event)request);
        return new ForwardingTSOFuture<Long>((ListenableFuture<Long>)request);
    }

    @Override
    public TSOFuture<Long> getFence(long tableId) {
        TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
        TSOProto.FenceRequest.Builder fenceReqBuilder = TSOProto.FenceRequest.newBuilder();
        fenceReqBuilder.setTableId(tableId);
        builder.setFenceRequest(fenceReqBuilder.build());
        RequestEvent request = new RequestEvent(builder.build(), this.requestMaxRetries);
        this.fsm.sendEvent((StateMachine.Event)request);
        return new ForwardingTSOFuture<Long>((ListenableFuture<Long>)request);
    }

    @Override
    public TSOFuture<Void> close() {
        final CloseEvent closeEvent = new CloseEvent();
        this.fsm.sendEvent((StateMachine.Event)closeEvent);
        closeEvent.addListener(new Runnable(){

            @Override
            public void run() {
                try {
                    closeEvent.get();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                }
                catch (ExecutionException e) {
                    e.printStackTrace();
                }
                finally {
                    TSOClient.this.fsmExecutor.shutdown();
                    if (TSOClient.this.currentTSOZNode != null) {
                        try {
                            TSOClient.this.currentTSOZNode.close();
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    if (TSOClient.this.zkClient != null) {
                        TSOClient.this.zkClient.close();
                    }
                }
            }
        }, this.fsmExecutor);
        return new ForwardingTSOFuture<Void>((ListenableFuture<Void>)closeEvent);
    }

    @Override
    public long getEpoch() {
        return this.epoch;
    }

    @Override
    public OmidClientConfiguration.ConflictDetectionLevel getConflictDetectionLevel() {
        return this.conflictDetectionLevel;
    }

    @Override
    public void setConflictDetectionLevel(OmidClientConfiguration.ConflictDetectionLevel conflictDetectionLevel) {
        this.conflictDetectionLevel = conflictDetectionLevel;
    }

    public void nodeChanged() throws Exception {
        String tsoInfo = this.getCurrentTSOInfoFoundInZK(this.zkCurrentTsoPath);
        String[] currentTSOAndEpochArray = tsoInfo.split("#");
        HostAndPort hp = HostAndPort.fromString((String)currentTSOAndEpochArray[0]);
        this.setTSOAddress(hp.getHost(), hp.getPort());
        this.epoch = Long.parseLong(currentTSOAndEpochArray[1]);
        LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", (Object)hp, (Object)this.getEpoch());
        if (this.currentChannel != null && this.currentChannel.isActive()) {
            LOG.info("\tClosing channel with previous TSO {}", (Object)this.currentChannel);
            this.currentChannel.close();
        }
    }

    @Override
    public boolean isLowLatency() {
        return this.lowLatency;
    }

    private synchronized void setTSOAddress(String host, int port) {
        this.tsoAddr = new InetSocketAddress(host, port);
    }

    private synchronized InetSocketAddress getAddress() {
        return this.tsoAddr;
    }

    private void configureCurrentTSOServerZNodeCache(String currentTsoPath) {
        try {
            this.currentTSOZNode = new NodeCache(this.zkClient, currentTsoPath);
            this.currentTSOZNode.getListenable().addListener((Object)this);
            this.currentTSOZNode.start(true);
        }
        catch (Exception e) {
            throw new IllegalStateException("Cannot start watcher on current TSO Server ZNode: " + e.getMessage());
        }
    }

    private String getCurrentTSOInfoFoundInZK(String currentTsoPath) {
        ChildData currentTSOData = this.currentTSOZNode.getCurrentData();
        if (currentTSOData == null) {
            throw new IllegalStateException("No data found in ZKNode " + currentTsoPath);
        }
        byte[] currentTSOAndEpochAsBytes = currentTSOData.getData();
        if (currentTSOAndEpochAsBytes == null) {
            throw new IllegalStateException("No data found for current TSO in ZKNode " + currentTsoPath);
        }
        return new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
    }

    private class Handler
    extends ChannelInboundHandlerAdapter {
        private StateMachine.Fsm fsm;

        Handler(StateMachine.Fsm fsm) {
            this.fsm = fsm;
        }

        public void channelActive(ChannelHandlerContext ctx) {
            TSOClient.this.currentChannel = ctx.channel();
            LOG.debug("HANDLER (CHANNEL ACTIVE): Connection {}. Sending connected event to FSM", (Object)ctx.channel());
            this.fsm.sendEvent((StateMachine.Event)new ConnectedEvent(ctx.channel()));
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            LOG.debug("HANDLER (CHANNEL INACTIVE): Connection {}. Sending error, then channelClosed event to FSM", (Object)ctx.channel());
            this.fsm.sendEvent((StateMachine.Event)new ErrorEvent(new ConnectionException()));
            this.fsm.sendEvent((StateMachine.Event)new ChannelClosedEvent(new ConnectionException()));
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            if (msg instanceof TSOProto.Response) {
                this.fsm.sendEvent((StateMachine.Event)new ResponseEvent((TSOProto.Response)msg));
            } else {
                LOG.warn("Received unknown message", msg);
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOG.error("Error on channel {}", (Object)ctx.channel(), (Object)cause);
            this.fsm.sendEvent((StateMachine.Event)new ErrorEvent(cause));
        }
    }

    private class ClosingState
    extends BaseState {
        ClosingState(StateMachine.Fsm fsm) {
            super(fsm);
            LOG.debug("NEW STATE: CLOSING");
        }

        public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
            return this;
        }

        public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
            return this;
        }

        public StateMachine.State handleEvent(FenceRequestTimeoutEvent e) {
            return this;
        }

        public StateMachine.State handleEvent(ErrorEvent e) {
            return this;
        }

        public StateMachine.State handleEvent(ResponseEvent e) {
            return this;
        }

        public StateMachine.State handleEvent(UserEvent e) {
            this.fsm.deferEvent((StateMachine.DeferrableEvent)e);
            return this;
        }

        public StateMachine.State handleEvent(ChannelClosedEvent e) {
            return new DisconnectedState(this.fsm);
        }

        public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
            return this;
        }
    }

    class ConnectedState
    extends BaseState {
        final Queue<RequestAndTimeout> timestampRequests;
        final Map<Long, RequestAndTimeout> commitRequests;
        final Map<Long, RequestAndTimeout> fenceRequests;
        final Channel channel;
        final HashedWheelTimer timeoutExecutor;

        ConnectedState(StateMachine.Fsm fsm, Channel channel, HashedWheelTimer timeoutExecutor) {
            super(fsm);
            LOG.debug("NEW STATE: CONNECTED");
            this.channel = channel;
            this.timeoutExecutor = timeoutExecutor;
            this.timestampRequests = new ArrayDeque<RequestAndTimeout>();
            this.commitRequests = new HashMap<Long, RequestAndTimeout>();
            this.fenceRequests = new HashMap<Long, RequestAndTimeout>();
        }

        private Timeout newTimeout(final StateMachine.Event timeoutEvent) {
            if (TSOClient.this.requestTimeoutInMs > 0) {
                return this.timeoutExecutor.newTimeout(new TimerTask(){

                    public void run(Timeout timeout) {
                        ConnectedState.this.fsm.sendEvent(timeoutEvent);
                    }
                }, (long)TSOClient.this.requestTimeoutInMs, TimeUnit.MILLISECONDS);
            }
            return null;
        }

        private void sendRequest(final StateMachine.Fsm fsm, RequestEvent request) {
            TSOProto.Request req = request.getRequest();
            if (req.hasTimestampRequest()) {
                this.timestampRequests.add(new RequestAndTimeout(request, this.newTimeout(new TimestampRequestTimeoutEvent())));
            } else if (req.hasCommitRequest()) {
                TSOProto.CommitRequest commitReq = req.getCommitRequest();
                this.commitRequests.put(commitReq.getStartTimestamp(), new RequestAndTimeout(request, this.newTimeout(new CommitRequestTimeoutEvent(commitReq.getStartTimestamp()))));
            } else if (req.hasFenceRequest()) {
                TSOProto.FenceRequest fenceReq = req.getFenceRequest();
                this.fenceRequests.put(fenceReq.getTableId(), new RequestAndTimeout(request, this.newTimeout(new FenceRequestTimeoutEvent(fenceReq.getTableId()))));
            } else {
                request.error(new IllegalArgumentException("Unknown request type"));
                return;
            }
            ChannelFuture f = this.channel.writeAndFlush((Object)req);
            f.addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) {
                    if (!future.isSuccess()) {
                        fsm.sendEvent((StateMachine.Event)new ErrorEvent(future.cause()));
                    }
                }
            });
        }

        private void handleResponse(ResponseEvent response) {
            TSOProto.Response resp = (TSOProto.Response)response.getParam();
            if (resp.hasTimestampResponse()) {
                if (this.timestampRequests.size() == 0) {
                    LOG.debug("Received timestamp response when no requests outstanding");
                    return;
                }
                RequestAndTimeout e = this.timestampRequests.remove();
                e.getRequest().success(resp.getTimestampResponse().getStartTimestamp());
                if (e.getTimeout() != null) {
                    e.getTimeout().cancel();
                }
            } else if (resp.hasCommitResponse()) {
                long startTimestamp = resp.getCommitResponse().getStartTimestamp();
                RequestAndTimeout e = this.commitRequests.remove(startTimestamp);
                if (e == null) {
                    LOG.debug("Received commit response for request that doesn't exist. Start TS: {}", (Object)startTimestamp);
                    return;
                }
                if (e.getTimeout() != null) {
                    e.getTimeout().cancel();
                }
                if (resp.getCommitResponse().getAborted()) {
                    e.getRequest().error(new AbortException());
                } else {
                    e.getRequest().success(resp.getCommitResponse().getCommitTimestamp());
                }
            } else if (resp.hasFenceResponse()) {
                long tableID = resp.getFenceResponse().getTableId();
                RequestAndTimeout e = this.fenceRequests.remove(tableID);
                if (e == null) {
                    LOG.debug("Received fence response for request that doesn't exist. Table ID: {}", (Object)tableID);
                    return;
                }
                if (e.getTimeout() != null) {
                    e.getTimeout().cancel();
                }
                e.getRequest().success(resp.getFenceResponse().getFenceId());
            }
        }

        public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
            if (!this.timestampRequests.isEmpty()) {
                RequestAndTimeout r = this.timestampRequests.remove();
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                this.queueRetryOrError(this.fsm, r.getRequest());
            }
            return this;
        }

        public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
            long startTimestamp = e.getStartTimestamp();
            if (this.commitRequests.containsKey(startTimestamp)) {
                RequestAndTimeout r = this.commitRequests.remove(startTimestamp);
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                this.queueRetryOrError(this.fsm, r.getRequest());
            }
            return this;
        }

        public StateMachine.State handleEvent(FenceRequestTimeoutEvent e) {
            long tableID = e.getTableID();
            if (this.fenceRequests.containsKey(tableID)) {
                RequestAndTimeout r = this.fenceRequests.remove(tableID);
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                this.queueRetryOrError(this.fsm, r.getRequest());
            }
            return this;
        }

        public StateMachine.State handleEvent(CloseEvent e) {
            LOG.debug("CONNECTED STATE: CloseEvent");
            this.timeoutExecutor.stop();
            this.closeChannelAndErrorRequests();
            this.fsm.deferEvent((StateMachine.DeferrableEvent)e);
            return new ClosingState(this.fsm);
        }

        public StateMachine.State handleEvent(RequestEvent e) {
            this.sendRequest(this.fsm, e);
            return this;
        }

        public StateMachine.State handleEvent(ResponseEvent e) {
            this.handleResponse(e);
            return this;
        }

        public StateMachine.State handleEvent(ErrorEvent e) {
            LOG.debug("CONNECTED STATE: ErrorEvent");
            this.timeoutExecutor.stop();
            this.handleError(this.fsm);
            return new ClosingState(this.fsm);
        }

        private void handleError(StateMachine.Fsm fsm) {
            RequestAndTimeout r;
            LOG.debug("CONNECTED STATE: Cancelling Timeouts in handleError");
            while (this.timestampRequests.size() > 0) {
                RequestAndTimeout r2 = this.timestampRequests.remove();
                if (r2.getTimeout() != null) {
                    r2.getTimeout().cancel();
                }
                this.queueRetryOrError(fsm, r2.getRequest());
            }
            Iterator<Map.Entry<Long, RequestAndTimeout>> iter = this.commitRequests.entrySet().iterator();
            while (iter.hasNext()) {
                r = iter.next().getValue();
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                this.queueRetryOrError(fsm, r.getRequest());
                iter.remove();
            }
            iter = this.fenceRequests.entrySet().iterator();
            while (iter.hasNext()) {
                r = iter.next().getValue();
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                this.queueRetryOrError(fsm, r.getRequest());
                iter.remove();
            }
            this.channel.close();
        }

        private void queueRetryOrError(StateMachine.Fsm fsm, RequestEvent e) {
            if (e.getRetriesLeft() > 0) {
                TSOProto.CommitRequest commitRequest;
                e.decrementRetries();
                if (e.getRequest().hasCommitRequest() && !(commitRequest = e.getRequest().getCommitRequest()).getIsRetry()) {
                    TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
                    TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
                    commitBuilder.mergeFrom(commitRequest);
                    commitBuilder.setIsRetry(true);
                    builder.setCommitRequest(commitBuilder.build());
                    e.setRequest(builder.build());
                }
                fsm.sendEvent((StateMachine.Event)e);
            } else {
                e.error(new ServiceUnavailableException("Number of retries exceeded. This API request failed permanently"));
            }
        }

        private void closeChannelAndErrorRequests() {
            this.channel.close();
            for (RequestAndTimeout r : this.timestampRequests) {
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                r.getRequest().error(new ClosingException());
            }
            for (RequestAndTimeout r : this.commitRequests.values()) {
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                r.getRequest().error(new ClosingException());
            }
            for (RequestAndTimeout r : this.fenceRequests.values()) {
                if (r.getTimeout() != null) {
                    r.getTimeout().cancel();
                }
                r.getRequest().error(new ClosingException());
            }
        }
    }

    private class HandshakeFailedState
    extends ConnectionFailedState {
        HandshakeFailedState(StateMachine.Fsm fsm, Throwable exception) {
            super(fsm, exception);
            LOG.debug("STATE: HANDSHAKING FAILED");
        }
    }

    class ConnectionFailedState
    extends BaseState {
        final HashedWheelTimer reconnectionTimeoutExecutor;
        Throwable exception;

        ConnectionFailedState(final StateMachine.Fsm fsm, Throwable exception) {
            super(fsm);
            this.reconnectionTimeoutExecutor = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("tso-client-backoff-timeout").build());
            LOG.debug("NEW STATE: CONNECTION FAILED [RE-CONNECTION BACKOFF]");
            this.exception = exception;
            this.reconnectionTimeoutExecutor.newTimeout(new TimerTask(){

                public void run(Timeout timeout) {
                    fsm.sendEvent((StateMachine.Event)new ReconnectEvent());
                }
            }, (long)TSOClient.this.tsoReconnectionDelayInSecs, TimeUnit.SECONDS);
        }

        public StateMachine.State handleEvent(UserEvent e) {
            e.error(this.exception);
            return this;
        }

        public StateMachine.State handleEvent(ErrorEvent e) {
            return this;
        }

        public StateMachine.State handleEvent(ChannelClosedEvent e) {
            return new DisconnectedState(this.fsm);
        }

        public StateMachine.State handleEvent(ReconnectEvent e) {
            return new DisconnectedState(this.fsm);
        }
    }

    private class HandshakingState
    extends BaseState {
        final Channel channel;
        final HashedWheelTimer timeoutExecutor;
        final Timeout timeout;

        HandshakingState(StateMachine.Fsm fsm, Channel channel) {
            super(fsm);
            this.timeoutExecutor = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("tso-client-timeout").build());
            LOG.debug("NEW STATE: HANDSHAKING");
            this.channel = channel;
            TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
            handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
            channel.writeAndFlush((Object)TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
            this.timeout = this.newTimeout();
        }

        private Timeout newTimeout() {
            if (TSOClient.this.requestTimeoutInMs > 0) {
                return this.timeoutExecutor.newTimeout(new TimerTask(){

                    public void run(Timeout timeout) {
                        HandshakingState.this.fsm.sendEvent((StateMachine.Event)new HandshakeTimeoutEvent());
                    }
                }, 30L, TimeUnit.SECONDS);
            }
            return null;
        }

        public StateMachine.State handleEvent(UserEvent e) {
            this.fsm.deferEvent((StateMachine.DeferrableEvent)e);
            return this;
        }

        public StateMachine.State handleEvent(ResponseEvent e) {
            TSOClient.this.lowLatency = ((TSOProto.Response)e.getParam()).getHandshakeResponse().getLowLatency();
            if (((TSOProto.Response)e.getParam()).hasHandshakeResponse() && ((TSOProto.Response)e.getParam()).getHandshakeResponse().getClientCompatible()) {
                if (this.timeout != null) {
                    this.timeout.cancel();
                }
                return new ConnectedState(this.fsm, this.channel, this.timeoutExecutor);
            }
            this.cleanupState();
            LOG.error("Client incompatible with server");
            return new HandshakeFailedState(this.fsm, new HandshakeFailedException());
        }

        public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
            this.cleanupState();
            return new ClosingState(this.fsm);
        }

        public StateMachine.State handleEvent(ErrorEvent e) {
            this.cleanupState();
            Throwable exception = (Throwable)e.getParam();
            LOG.error("Error during handshake", exception);
            return new HandshakeFailedState(this.fsm, exception);
        }

        private void cleanupState() {
            this.timeoutExecutor.stop();
            this.channel.close();
            if (this.timeout != null) {
                this.timeout.cancel();
            }
        }
    }

    private static class RequestAndTimeout {
        final RequestEvent event;
        final Timeout timeout;

        RequestAndTimeout(RequestEvent event, Timeout timeout) {
            this.event = event;
            this.timeout = timeout;
        }

        RequestEvent getRequest() {
            return this.event;
        }

        Timeout getTimeout() {
            return this.timeout;
        }

        public String toString() {
            String info = "Request type ";
            info = this.event.getRequest().hasTimestampRequest() ? info + "[Timestamp]" : (this.event.getRequest().hasCommitRequest() ? info + "[Commit] Start TS ->" + this.event.getRequest().getCommitRequest().getStartTimestamp() : info + "NONE");
            return info;
        }
    }

    private class ConnectingState
    extends BaseState {
        ConnectingState(StateMachine.Fsm fsm) {
            super(fsm);
            LOG.debug("NEW STATE: CONNECTING");
        }

        public StateMachine.State handleEvent(UserEvent e) {
            this.fsm.deferEvent((StateMachine.DeferrableEvent)e);
            return this;
        }

        public StateMachine.State handleEvent(ConnectedEvent e) {
            return new HandshakingState(this.fsm, (Channel)e.getParam());
        }

        public StateMachine.State handleEvent(ChannelClosedEvent e) {
            return new ConnectionFailedState(this.fsm, (Throwable)e.getParam());
        }

        public StateMachine.State handleEvent(ErrorEvent e) {
            return new ConnectionFailedState(this.fsm, (Throwable)e.getParam());
        }
    }

    class DisconnectedState
    extends BaseState {
        DisconnectedState(StateMachine.Fsm fsm) {
            super(fsm);
            LOG.debug("NEW STATE: DISCONNECTED");
        }

        public StateMachine.State handleEvent(RequestEvent e) {
            this.fsm.deferEvent((StateMachine.DeferrableEvent)e);
            return this.tryToConnectToTSOServer();
        }

        public StateMachine.State handleEvent(CloseEvent e) {
            TSOClient.this.bootstrap.config().group().shutdownGracefully();
            e.success(null);
            return this;
        }

        private StateMachine.State tryToConnectToTSOServer() {
            final InetSocketAddress tsoAddress = TSOClient.this.getAddress();
            LOG.info("Trying to connect to TSO [{}]", (Object)tsoAddress);
            ChannelFuture channelFuture = TSOClient.this.bootstrap.connect((SocketAddress)tsoAddress);
            channelFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        LOG.info("Connection to TSO [{}] established. Channel {}", (Object)tsoAddress, (Object)channelFuture.channel());
                    } else {
                        LOG.error("Failed connection attempt to TSO [{}] failed. Channel {}", (Object)tsoAddress, (Object)channelFuture.channel());
                        DisconnectedState.this.fsm.sendEvent((StateMachine.Event)new ErrorEvent(new ConnectionException()));
                    }
                }
            });
            return new ConnectingState(this.fsm);
        }
    }

    class BaseState
    extends StateMachine.State {
        BaseState(StateMachine.Fsm fsm) {
            super(fsm);
        }

        public StateMachine.State handleEvent(StateMachine.Event e) {
            LOG.error("Unhandled event {} while in state {}", (Object)e, (Object)((Object)((Object)this)).getClass().getName());
            return this;
        }
    }

    private static class ResponseEvent
    extends ParamEvent<TSOProto.Response> {
        ResponseEvent(TSOProto.Response r) {
            super(r);
        }
    }

    private static class RequestEvent
    extends UserEvent<Long> {
        TSOProto.Request req;
        int retriesLeft;

        RequestEvent(TSOProto.Request req, int retriesLeft) {
            this.req = req;
            this.retriesLeft = retriesLeft;
        }

        TSOProto.Request getRequest() {
            return this.req;
        }

        void setRequest(TSOProto.Request request) {
            this.req = request;
        }

        int getRetriesLeft() {
            return this.retriesLeft;
        }

        void decrementRetries() {
            --this.retriesLeft;
        }
    }

    private static class FenceRequestTimeoutEvent
    implements StateMachine.Event {
        final long tableID;

        FenceRequestTimeoutEvent(long tableID) {
            this.tableID = tableID;
        }

        public long getTableID() {
            return this.tableID;
        }
    }

    private static class CommitRequestTimeoutEvent
    implements StateMachine.Event {
        final long startTimestamp;

        CommitRequestTimeoutEvent(long startTimestamp) {
            this.startTimestamp = startTimestamp;
        }

        public long getStartTimestamp() {
            return this.startTimestamp;
        }
    }

    private static class TimestampRequestTimeoutEvent
    implements StateMachine.Event {
        private TimestampRequestTimeoutEvent() {
        }
    }

    private static class HandshakeTimeoutEvent
    implements StateMachine.Event {
        private HandshakeTimeoutEvent() {
        }
    }

    private static class ReconnectEvent
    implements StateMachine.Event {
        private ReconnectEvent() {
        }
    }

    private static class ChannelClosedEvent
    extends ParamEvent<Throwable> {
        ChannelClosedEvent(Throwable t) {
            super(t);
        }
    }

    private static class CloseEvent
    extends UserEvent<Void> {
        private CloseEvent() {
        }
    }

    private static class UserEvent<T>
    extends AbstractFuture<T>
    implements StateMachine.DeferrableEvent {
        private UserEvent() {
        }

        void success(T value) {
            this.set(value);
        }

        public void error(Throwable t) {
            this.setException(t);
        }
    }

    private static class ConnectedEvent
    extends ParamEvent<Channel> {
        ConnectedEvent(Channel c) {
            super(c);
        }
    }

    private static class ErrorEvent
    extends ParamEvent<Throwable> {
        ErrorEvent(Throwable t) {
            super(t);
        }
    }

    private static class ParamEvent<T>
    implements StateMachine.Event {
        final T param;

        ParamEvent(T param) {
            this.param = param;
        }

        T getParam() {
            return this.param;
        }
    }
}

