/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.buffer;

import com.couchbase.client.core.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.core.deps.io.netty.channel.ChannelInboundHandlerAdapter;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.buffer.DcpBucketConfig;
import com.couchbase.client.dcp.buffer.DcpOps;
import com.couchbase.client.dcp.buffer.DcpOpsImpl;
import com.couchbase.client.dcp.buffer.DcpRequestDispatcher;
import com.couchbase.client.dcp.buffer.PartitionInstance;
import com.couchbase.client.dcp.buffer.PersistedSeqnos;
import com.couchbase.client.dcp.conductor.BucketConfigSource;
import com.couchbase.client.dcp.conductor.DcpChannel;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.core.state.NotConnectedException;
import com.couchbase.client.dcp.metrics.DcpClientMetrics;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class PersistencePollingHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(PersistencePollingHandler.class);
    private final Client.Environment env;
    private final BucketConfigSource bucketConfigSource;
    private final DcpOps dcpOps;
    private final PersistedSeqnos persistedSeqnos;
    private final AtomicBoolean loggedClosureWarning = new AtomicBoolean();
    private final LongAdder scheduledPollingTasks;
    private Disposable configSubscription;
    private int activeGroupId;

    public PersistencePollingHandler(Client.Environment env, BucketConfigSource bucketConfigSource, DcpRequestDispatcher dispatcher, DcpClientMetrics clientMetrics) {
        this.env = Objects.requireNonNull(env);
        this.bucketConfigSource = Objects.requireNonNull(bucketConfigSource);
        this.persistedSeqnos = Objects.requireNonNull(env.persistedSeqnos());
        this.dcpOps = new DcpOpsImpl(dispatcher);
        this.scheduledPollingTasks = clientMetrics.scheduledPollingTasks();
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        if (this.configSubscription != null) {
            this.configSubscription.dispose();
        }
        ++this.activeGroupId;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.configSubscription = this.bucketConfigSource.configs().publishOn(Schedulers.fromExecutor((Executor)ctx.executor())).subscribe(bucketConfig -> this.reconfigure(ctx, (DcpBucketConfig)bucketConfig));
    }

    private void reconfigure(ChannelHandlerContext ctx, DcpBucketConfig bucketConfig) {
        LOGGER.debug("Reconfiguring persistence pollers.");
        int groupId = ++this.activeGroupId;
        this.persistedSeqnos.reset(bucketConfig);
        LOGGER.debug("Starting persistence polling group {}", (Object)groupId);
        try {
            for (PartitionInstance absentInstance : bucketConfig.getAbsentPartitionInstances()) {
                LOGGER.debug("Partition instance {} is absent, will assume all seqnos persisted.", (Object)absentInstance);
                this.persistedSeqnos.markAsAbsent(absentInstance);
            }
            HostAndPort nodeAddress = DcpChannel.getHostAndPort(ctx.channel());
            List<PartitionInstance> partitions = bucketConfig.getHostedPartitions(nodeAddress);
            LOGGER.debug("Node {} hosts partitions {}", (Object)nodeAddress, partitions);
            Iterator<PartitionInstance> iterator = partitions.iterator();
            while (iterator.hasNext()) {
                PartitionInstance partitionInstance;
                PartitionInstance pas = partitionInstance = iterator.next();
                this.dcpOps.getFailoverLog(partitionInstance.partition()).subscribe(failoverLog -> {
                    long vbuuid = failoverLog.getCurrentVbuuid();
                    this.observeAndRepeat(ctx, pas, vbuuid, groupId);
                }, throwable -> {
                    if (throwable instanceof DcpOps.BadResponseStatusException) {
                        this.logWarningAndClose(ctx, "Failed to fetch failover log for {}. Server response: {}", pas, throwable.getMessage());
                    } else {
                        this.logWarningAndClose(ctx, "Failed to fetch failover log for {}.", pas, throwable);
                    }
                });
            }
        }
        catch (Throwable t) {
            this.logWarningAndClose(ctx, "Failed to reconfigure persistence poller.", t);
        }
    }

    private void scheduleObserveAndRepeat(ChannelHandlerContext ctx, PartitionInstance partitionInstance, long vbuuid, int groupId, int intervalMultiplier) {
        if (intervalMultiplier < 1) {
            throw new IllegalArgumentException("Interval multiplier must be > 0");
        }
        try {
            ctx.executor().schedule(() -> {
                this.scheduledPollingTasks.decrement();
                this.observeAndRepeat(ctx, partitionInstance, vbuuid, groupId);
            }, this.env.persistencePollingIntervalMillis() * (long)intervalMultiplier, TimeUnit.MILLISECONDS);
            this.scheduledPollingTasks.increment();
        }
        catch (Throwable t) {
            this.logWarningAndClose(ctx, "Failed to schedule observeSeqno.", t);
        }
    }

    private void observeAndRepeat(ChannelHandlerContext ctx, PartitionInstance partitionInstance, long vbuuid, int groupId) {
        if (this.activeGroupId != groupId) {
            LOGGER.debug("Polling group {} is no longer active; stopping polling for {}", (Object)groupId, (Object)partitionInstance);
            return;
        }
        if (!this.env.streamEventBuffer().hasBufferedEvents(partitionInstance.partition())) {
            LOGGER.trace("No buffered events; skipping observeSeqno for partition instance {}", (Object)partitionInstance);
            this.scheduleObserveAndRepeat(ctx, partitionInstance, vbuuid, groupId, 1);
            return;
        }
        this.dcpOps.observeSeqno(partitionInstance.partition(), vbuuid).doOnSuccess(observeSeqnoResponse -> {
            try {
                if (this.activeGroupId != groupId) {
                    LOGGER.debug("Polling group {} is no longer active; stopping polling for {}", (Object)groupId, (Object)partitionInstance);
                    return;
                }
                long newVbuuid = observeSeqnoResponse.vbuuid();
                long minSeqnoPersistedEverywhere = this.persistedSeqnos.update(partitionInstance, newVbuuid, observeSeqnoResponse.persistSeqno());
                this.env.streamEventBuffer().onSeqnoPersisted(observeSeqnoResponse.vbid(), minSeqnoPersistedEverywhere);
                this.scheduleObserveAndRepeat(ctx, partitionInstance, newVbuuid, groupId, 1);
            }
            catch (Throwable t) {
                this.logWarningAndClose(ctx, "Fatal error in observeAndRepeat handling observeSeqno response.", t);
            }
        }).onErrorResume(t -> {
            if (this.activeGroupId != groupId || t instanceof NotConnectedException) {
                LOGGER.debug("Polling group {} is no longer active; stopping polling for {}", (Object)groupId, (Object)partitionInstance);
                return Mono.empty();
            }
            if (t instanceof DcpOps.BadResponseStatusException) {
                DcpOps.BadResponseStatusException e = (DcpOps.BadResponseStatusException)t;
                if (e.status().isTemporary()) {
                    LOGGER.debug("observeSeqno failed with status code " + e.status() + " ; will retry after an extended delay.");
                    this.scheduleObserveAndRepeat(ctx, partitionInstance, vbuuid, groupId, 10);
                } else {
                    this.logWarningAndClose(ctx, "observeSeqno failed with status code " + e.status(), new Object[0]);
                }
            } else {
                this.logWarningAndClose(ctx, "observeSeqno failed.", t);
            }
            return Mono.empty();
        }).subscribe();
    }

    private void logWarningAndClose(ChannelHandlerContext ctx, String msg, Object ... params) {
        if (this.loggedClosureWarning.compareAndSet(false, true)) {
            LOGGER.warn("Closing channel; " + msg, params);
            ctx.close();
        } else {
            LOGGER.trace("Closing channel; " + msg, params);
        }
    }
}

