/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.api.client.util.Clock;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Lists;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Sets;
import com.google.cloud.dataflow.sdk.util.PubsubClient;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;

public class PubsubTestClient
extends PubsubClient {
    private static final State STATE = new State();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static PubsubTestClientFactory createFactoryForPublish(PubsubClient.TopicPath expectedTopic, Iterable<PubsubClient.OutgoingMessage> expectedOutgoingMessages, Iterable<PubsubClient.OutgoingMessage> failingOutgoingMessages) {
        State state = STATE;
        synchronized (state) {
            Preconditions.checkState(!PubsubTestClient.STATE.isActive, "Test still in flight");
            PubsubTestClient.STATE.expectedTopic = expectedTopic;
            PubsubTestClient.STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
            PubsubTestClient.STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
            PubsubTestClient.STATE.isActive = true;
        }
        return new PubsubTestClientFactory(){

            @Override
            public PubsubClient newClient(@Nullable String timestampLabel, @Nullable String idLabel, DataflowPipelineOptions options) throws IOException {
                return new PubsubTestClient();
            }

            @Override
            public String getKind() {
                return "PublishTest";
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void close() {
                State state = STATE;
                synchronized (state) {
                    Preconditions.checkState(STATE.isActive, "No test still in flight");
                    Preconditions.checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(), "Still waiting for %s messages to be published", STATE.remainingExpectedOutgoingMessages.size());
                    STATE.isActive = false;
                    STATE.remainingExpectedOutgoingMessages = null;
                }
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static PubsubTestClientFactory createFactoryForPull(Clock clock, PubsubClient.SubscriptionPath expectedSubscription, int ackTimeoutSec, Iterable<PubsubClient.IncomingMessage> expectedIncomingMessages) {
        State state = STATE;
        synchronized (state) {
            Preconditions.checkState(!PubsubTestClient.STATE.isActive, "Test still in flight");
            PubsubTestClient.STATE.clock = clock;
            PubsubTestClient.STATE.expectedSubscription = expectedSubscription;
            PubsubTestClient.STATE.ackTimeoutSec = ackTimeoutSec;
            PubsubTestClient.STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages);
            PubsubTestClient.STATE.pendingAckIncomingMessages = new HashMap<String, PubsubClient.IncomingMessage>();
            PubsubTestClient.STATE.ackDeadline = new HashMap<String, Long>();
            PubsubTestClient.STATE.isActive = true;
        }
        return new PubsubTestClientFactory(){

            @Override
            public PubsubClient newClient(@Nullable String timestampLabel, @Nullable String idLabel, DataflowPipelineOptions options) throws IOException {
                return new PubsubTestClient();
            }

            @Override
            public String getKind() {
                return "PullTest";
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void close() {
                State state = STATE;
                synchronized (state) {
                    Preconditions.checkState(STATE.isActive, "No test still in flight");
                    Preconditions.checkState(STATE.remainingPendingIncomingMessages.isEmpty(), "Still waiting for %s messages to be pulled", STATE.remainingPendingIncomingMessages.size());
                    Preconditions.checkState(STATE.pendingAckIncomingMessages.isEmpty(), "Still waiting for %s messages to be ACKed", STATE.pendingAckIncomingMessages.size());
                    Preconditions.checkState(STATE.ackDeadline.isEmpty(), "Still waiting for %s messages to be ACKed", STATE.ackDeadline.size());
                    STATE.isActive = false;
                    STATE.remainingPendingIncomingMessages = null;
                    STATE.pendingAckIncomingMessages = null;
                    STATE.ackDeadline = null;
                }
            }
        };
    }

    private boolean inPullMode() {
        Preconditions.checkState(PubsubTestClient.STATE.isActive, "No test is active");
        return PubsubTestClient.STATE.expectedSubscription != null;
    }

    private boolean inPublishMode() {
        Preconditions.checkState(PubsubTestClient.STATE.isActive, "No test is active");
        return PubsubTestClient.STATE.expectedTopic != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void advance() {
        State state = STATE;
        synchronized (state) {
            Preconditions.checkState(this.inPullMode(), "Can only advance in pull mode");
            Iterator<Map.Entry<String, Long>> deadlineItr = PubsubTestClient.STATE.ackDeadline.entrySet().iterator();
            while (deadlineItr.hasNext()) {
                Map.Entry<String, Long> entry = deadlineItr.next();
                if (entry.getValue() > PubsubTestClient.STATE.clock.currentTimeMillis()) continue;
                PubsubTestClient.STATE.remainingPendingIncomingMessages.add(PubsubTestClient.STATE.pendingAckIncomingMessages.remove(entry.getKey()));
                deadlineItr.remove();
            }
        }
    }

    @Override
    public void close() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int publish(PubsubClient.TopicPath topic, List<PubsubClient.OutgoingMessage> outgoingMessages) throws IOException {
        State state = STATE;
        synchronized (state) {
            Preconditions.checkState(this.inPublishMode(), "Can only publish in publish mode");
            Preconditions.checkState(topic.equals(PubsubTestClient.STATE.expectedTopic), "Topic %s does not match expected %s", topic, PubsubTestClient.STATE.expectedTopic);
            for (PubsubClient.OutgoingMessage outgoingMessage : outgoingMessages) {
                if (PubsubTestClient.STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
                    throw new RuntimeException("Simulating failure for " + outgoingMessage);
                }
                Preconditions.checkState(PubsubTestClient.STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage), "Unexpected outgoing message %s", outgoingMessage);
            }
            return outgoingMessages.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PubsubClient.IncomingMessage> pull(long requestTimeMsSinceEpoch, PubsubClient.SubscriptionPath subscription, int batchSize, boolean returnImmediately) throws IOException {
        State state = STATE;
        synchronized (state) {
            Preconditions.checkState(this.inPullMode(), "Can only pull in pull mode");
            long now = PubsubTestClient.STATE.clock.currentTimeMillis();
            Preconditions.checkState(requestTimeMsSinceEpoch == now, "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch);
            Preconditions.checkState(subscription.equals(PubsubTestClient.STATE.expectedSubscription), "Subscription %s does not match expected %s", subscription, PubsubTestClient.STATE.expectedSubscription);
            Preconditions.checkState(returnImmediately, "Pull only supported if returning immediately");
            ArrayList<PubsubClient.IncomingMessage> incomingMessages = new ArrayList<PubsubClient.IncomingMessage>();
            Iterator<PubsubClient.IncomingMessage> pendItr = PubsubTestClient.STATE.remainingPendingIncomingMessages.iterator();
            while (pendItr.hasNext()) {
                PubsubClient.IncomingMessage incomingMessage = pendItr.next();
                pendItr.remove();
                PubsubClient.IncomingMessage incomingMessageWithRequestTime = incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
                incomingMessages.add(incomingMessageWithRequestTime);
                PubsubTestClient.STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId, incomingMessageWithRequestTime);
                PubsubTestClient.STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId, requestTimeMsSinceEpoch + (long)(PubsubTestClient.STATE.ackTimeoutSec * 1000));
                if (incomingMessages.size() < batchSize) continue;
                break;
            }
            return incomingMessages;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void acknowledge(PubsubClient.SubscriptionPath subscription, List<String> ackIds) throws IOException {
        State state = STATE;
        synchronized (state) {
            Preconditions.checkState(this.inPullMode(), "Can only acknowledge in pull mode");
            Preconditions.checkState(subscription.equals(PubsubTestClient.STATE.expectedSubscription), "Subscription %s does not match expected %s", subscription, PubsubTestClient.STATE.expectedSubscription);
            for (String ackId : ackIds) {
                Preconditions.checkState(PubsubTestClient.STATE.ackDeadline.remove(ackId) != null, "No message with ACK id %s is waiting for an ACK", ackId);
                Preconditions.checkState(PubsubTestClient.STATE.pendingAckIncomingMessages.remove(ackId) != null, "No message with ACK id %s is waiting for an ACK", ackId);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void modifyAckDeadline(PubsubClient.SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
        State state = STATE;
        synchronized (state) {
            Preconditions.checkState(this.inPullMode(), "Can only modify ack deadline in pull mode");
            Preconditions.checkState(subscription.equals(PubsubTestClient.STATE.expectedSubscription), "Subscription %s does not match expected %s", subscription, PubsubTestClient.STATE.expectedSubscription);
            for (String ackId : ackIds) {
                if (deadlineSeconds > 0) {
                    Preconditions.checkState(PubsubTestClient.STATE.ackDeadline.remove(ackId) != null, "No message with ACK id %s is waiting for an ACK", ackId);
                    Preconditions.checkState(PubsubTestClient.STATE.pendingAckIncomingMessages.containsKey(ackId), "No message with ACK id %s is waiting for an ACK", ackId);
                    PubsubTestClient.STATE.ackDeadline.put(ackId, PubsubTestClient.STATE.clock.currentTimeMillis() + (long)(deadlineSeconds * 1000));
                    continue;
                }
                Preconditions.checkState(PubsubTestClient.STATE.ackDeadline.remove(ackId) != null, "No message with ACK id %s is waiting for an ACK", ackId);
                PubsubClient.IncomingMessage message = PubsubTestClient.STATE.pendingAckIncomingMessages.remove(ackId);
                Preconditions.checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId);
                PubsubTestClient.STATE.remainingPendingIncomingMessages.add(message);
            }
        }
    }

    @Override
    public void createTopic(PubsubClient.TopicPath topic) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void deleteTopic(PubsubClient.TopicPath topic) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public List<PubsubClient.TopicPath> listTopics(PubsubClient.ProjectPath project) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void createSubscription(PubsubClient.TopicPath topic, PubsubClient.SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void deleteSubscription(PubsubClient.SubscriptionPath subscription) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public List<PubsubClient.SubscriptionPath> listSubscriptions(PubsubClient.ProjectPath project, PubsubClient.TopicPath topic) throws IOException {
        throw new UnsupportedOperationException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int ackDeadlineSeconds(PubsubClient.SubscriptionPath subscription) throws IOException {
        State state = STATE;
        synchronized (state) {
            return PubsubTestClient.STATE.ackTimeoutSec;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isEOF() {
        State state = STATE;
        synchronized (state) {
            Preconditions.checkState(this.inPullMode(), "Can only check EOF in pull mode");
            return PubsubTestClient.STATE.remainingPendingIncomingMessages.isEmpty();
        }
    }

    public static interface PubsubTestClientFactory
    extends PubsubClient.PubsubClientFactory,
    Closeable {
    }

    private static class State {
        boolean isActive;
        @Nullable
        PubsubClient.TopicPath expectedTopic;
        @Nullable
        Set<PubsubClient.OutgoingMessage> remainingExpectedOutgoingMessages;
        @Nullable
        Set<PubsubClient.OutgoingMessage> remainingFailingOutgoingMessages;
        @Nullable
        Clock clock;
        @Nullable
        PubsubClient.SubscriptionPath expectedSubscription;
        int ackTimeoutSec;
        @Nullable
        List<PubsubClient.IncomingMessage> remainingPendingIncomingMessages;
        @Nullable
        Map<String, PubsubClient.IncomingMessage> pendingAckIncomingMessages;
        @Nullable
        Map<String, Long> ackDeadline;

        private State() {
        }
    }
}

