/*
 * Decompiled with CFR 0.152.
 */
package org.apache.omid.tso;

import com.google.inject.name.Named;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.netty.channel.Channel;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.tso.Batch;
import org.apache.omid.tso.FatalExceptionHandler;
import org.apache.omid.tso.MonitoringContext;
import org.apache.omid.tso.Panicker;
import org.apache.omid.tso.PersistenceProcessor;
import org.apache.omid.tso.PersistenceProcessorHandler;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PersistenceProcessorImpl
implements PersistenceProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorImpl.class);
    private final ExecutorService disruptorExec;
    private final Disruptor<PersistBatchEvent> disruptor;
    private final RingBuffer<PersistBatchEvent> persistRing;
    private final ObjectPool<Batch> batchPool;
    @VisibleForTesting
    Batch currentBatch;
    private volatile long batchSequence;
    private MetricsRegistry metrics;

    @Inject
    PersistenceProcessorImpl(TSOServerConfig config, @Named(value="PersistenceStrategy") WaitStrategy strategy, CommitTable commitTable, ObjectPool<Batch> batchPool, Panicker panicker, PersistenceProcessorHandler[] handlers, MetricsRegistry metrics) throws Exception {
        ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
        this.disruptorExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(), threadFactory.build());
        this.disruptor = new Disruptor(PersistBatchEvent.EVENT_FACTORY, 0x100000, (Executor)this.disruptorExec, ProducerType.SINGLE, strategy);
        this.disruptor.handleExceptionsWith((ExceptionHandler)new FatalExceptionHandler(panicker));
        this.disruptor.handleEventsWithWorkerPool((WorkHandler[])handlers);
        this.persistRing = this.disruptor.start();
        this.metrics = metrics;
        this.batchSequence = 0L;
        this.batchPool = batchPool;
        this.currentBatch = (Batch)batchPool.borrowObject();
        LOG.info("PersistentProcessor initialized");
    }

    @Override
    public void triggerCurrentBatchFlush() throws Exception {
        if (this.currentBatch.isEmpty()) {
            return;
        }
        long seq = this.persistRing.next();
        PersistBatchEvent e = (PersistBatchEvent)this.persistRing.get(seq);
        PersistBatchEvent.makePersistBatch(e, this.batchSequence++, this.currentBatch);
        this.persistRing.publish(seq);
        this.currentBatch = (Batch)this.batchPool.borrowObject();
    }

    @Override
    public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> newLowWatermark) throws Exception {
        this.currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx, newLowWatermark);
        if (this.currentBatch.isFull()) {
            this.triggerCurrentBatchFlush();
        }
    }

    @Override
    public void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
        this.currentBatch.addCommitRetry(startTimestamp, c, monCtx);
        if (this.currentBatch.isFull()) {
            this.triggerCurrentBatchFlush();
        }
    }

    @Override
    public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
        this.currentBatch.addAbort(startTimestamp, c, monCtx);
        if (this.currentBatch.isFull()) {
            this.triggerCurrentBatchFlush();
        }
    }

    @Override
    public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
        this.currentBatch.addTimestamp(startTimestamp, c, monCtx);
        if (this.currentBatch.isFull()) {
            this.triggerCurrentBatchFlush();
        }
    }

    @Override
    public void addFenceToBatch(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
        this.currentBatch.addFence(tableID, fenceTimestamp, c, monCtx);
        if (this.currentBatch.isFull()) {
            this.triggerCurrentBatchFlush();
        }
    }

    @Override
    public void close() throws IOException {
        LOG.info("Terminating Persistence Processor...");
        this.disruptor.halt();
        this.disruptor.shutdown();
        LOG.info("\tPersistence Processor Disruptor shutdown");
        this.disruptorExec.shutdownNow();
        try {
            this.disruptorExec.awaitTermination(3L, TimeUnit.SECONDS);
            LOG.info("\tPersistence Processor Disruptor executor shutdown");
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted whilst finishing Persistence Processor Disruptor executor");
            Thread.currentThread().interrupt();
        }
        LOG.info("Persistence Processor terminated");
    }

    static final class PersistBatchEvent {
        private long batchSequence;
        private Batch batch;
        static final EventFactory<PersistBatchEvent> EVENT_FACTORY = new EventFactory<PersistBatchEvent>(){

            public PersistBatchEvent newInstance() {
                return new PersistBatchEvent();
            }
        };

        PersistBatchEvent() {
        }

        static void makePersistBatch(PersistBatchEvent e, long batchSequence, Batch batch) {
            e.batch = batch;
            e.batchSequence = batchSequence;
        }

        Batch getBatch() {
            return this.batch;
        }

        long getBatchSequence() {
            return this.batchSequence;
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("batchSequence", this.batchSequence).add("batch", (Object)this.batch).toString();
        }
    }
}

