/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.MoreObjects;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Optional;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheBuilder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheLoader;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.LoadingCache;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.runners.inprocess.AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate;
import com.google.cloud.dataflow.sdk.runners.inprocess.CommittedResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.CompletionCallback;
import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutor;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.ModelEnforcementFactory;
import com.google.cloud.dataflow.sdk.runners.inprocess.StepAndKey;
import com.google.cloud.dataflow.sdk.runners.inprocess.StructuralKey;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluatorRegistry;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformExecutor;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformExecutorService;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformExecutorServices;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
import com.google.cloud.dataflow.sdk.util.TimeDomain;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ExecutorServiceParallelExecutor
implements InProcessExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
    private final ExecutorService executorService;
    private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
    private final Set<PValue> keyedPValues;
    private final TransformEvaluatorRegistry registry;
    private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements;
    private final InProcessEvaluationContext evaluationContext;
    private final LoadingCache<StepAndKey, TransformExecutorService> executorServices;
    private final Queue<ExecutorUpdate> allUpdates;
    private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates;
    private final TransformExecutorService parallelExecutorService;
    private final CompletionCallback defaultCompletionCallback;
    private Collection<AppliedPTransform<?, ?, ?>> rootNodes;
    private final AtomicReference<ExecutorState> state = new AtomicReference<ExecutorState>(ExecutorState.QUIESCENT);
    private final AtomicLong outstandingWork = new AtomicLong();

    public static ExecutorServiceParallelExecutor create(ExecutorService executorService, Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, Set<PValue> keyedPValues, TransformEvaluatorRegistry registry, Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements, InProcessEvaluationContext context) {
        return new ExecutorServiceParallelExecutor(executorService, valueToConsumers, keyedPValues, registry, transformEnforcements, context);
    }

    private ExecutorServiceParallelExecutor(ExecutorService executorService, Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, Set<PValue> keyedPValues, TransformEvaluatorRegistry registry, Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements, InProcessEvaluationContext context) {
        this.executorService = executorService;
        this.valueToConsumers = valueToConsumers;
        this.keyedPValues = keyedPValues;
        this.registry = registry;
        this.transformEnforcements = transformEnforcements;
        this.evaluationContext = context;
        this.executorServices = CacheBuilder.newBuilder().weakValues().build(this.serialTransformExecutorServiceCacheLoader());
        this.allUpdates = new ConcurrentLinkedQueue<ExecutorUpdate>();
        this.visibleUpdates = new ArrayBlockingQueue<VisibleExecutorUpdate>(20);
        this.parallelExecutorService = TransformExecutorServices.parallel(executorService);
        this.defaultCompletionCallback = new TimerIterableCompletionCallback(Collections.emptyList());
    }

    private CacheLoader<StepAndKey, TransformExecutorService> serialTransformExecutorServiceCacheLoader() {
        return new CacheLoader<StepAndKey, TransformExecutorService>(){

            @Override
            public TransformExecutorService load(StepAndKey stepAndKey) throws Exception {
                return TransformExecutorServices.serial(ExecutorServiceParallelExecutor.this.executorService);
            }
        };
    }

    @Override
    public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
        this.rootNodes = ImmutableList.copyOf(roots);
        MonitorRunnable monitorRunnable = new MonitorRunnable();
        this.executorService.submit(monitorRunnable);
    }

    public void scheduleConsumption(AppliedPTransform<?, ?, ?> consumer, @Nullable InProcessPipelineRunner.CommittedBundle<?> bundle, CompletionCallback onComplete) {
        this.evaluateBundle(consumer, bundle, onComplete);
    }

    private <T> void evaluateBundle(AppliedPTransform<?, ?, ?> transform, @Nullable InProcessPipelineRunner.CommittedBundle<T> bundle, CompletionCallback onComplete) {
        TransformExecutorService transformExecutor;
        if (bundle != null && this.isKeyed(bundle.getPCollection())) {
            StepAndKey stepAndKey = StepAndKey.of(transform, bundle == null ? null : bundle.getKey());
            transformExecutor = this.executorServices.getUnchecked(stepAndKey);
        } else {
            transformExecutor = this.parallelExecutorService;
        }
        Collection enforcements = MoreObjects.firstNonNull(this.transformEnforcements.get(transform.getTransform().getClass()), Collections.emptyList());
        TransformExecutor<T> callable = TransformExecutor.create(this.registry, enforcements, this.evaluationContext, bundle, transform, onComplete, transformExecutor);
        this.outstandingWork.incrementAndGet();
        transformExecutor.schedule(callable);
    }

    private boolean isKeyed(PValue pvalue) {
        return this.keyedPValues.contains(pvalue);
    }

    private void scheduleConsumers(ExecutorUpdate update) {
        InProcessPipelineRunner.CommittedBundle<?> bundle = update.getBundle().get();
        for (AppliedPTransform<?, ?, ?> consumer : update.getConsumers()) {
            this.scheduleConsumption(consumer, bundle, this.defaultCompletionCallback);
        }
    }

    @Override
    public void awaitCompletion() throws Throwable {
        VisibleExecutorUpdate update;
        do {
            if ((update = this.visibleUpdates.poll(2L, TimeUnit.SECONDS)) == null && this.executorService.isShutdown()) {
                return;
            }
            if (update == null || !update.throwable.isPresent()) continue;
            throw (Throwable)update.throwable.get();
        } while (update == null || !update.isDone());
        this.executorService.shutdown();
    }

    private static enum ExecutorState {
        ACTIVE,
        PROCESSING,
        QUIESCING,
        QUIESCENT;

    }

    private class MonitorRunnable
    implements Runnable {
        private final long maxTimeProcessingUpdatesNanos = TimeUnit.MILLISECONDS.toNanos(5L);
        private final String runnableName = String.format("%s$%s-monitor", ExecutorServiceParallelExecutor.access$300(ExecutorServiceParallelExecutor.this).getPipelineOptions().getAppName(), ExecutorServiceParallelExecutor.class.getSimpleName());
        private boolean exceptionThrown = false;

        private MonitorRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            String oldName = Thread.currentThread().getName();
            Thread.currentThread().setName(this.runnableName);
            try {
                boolean noWorkOutstanding = ExecutorServiceParallelExecutor.this.outstandingWork.get() == 0L;
                ExecutorState startingState = (ExecutorState)((Object)ExecutorServiceParallelExecutor.this.state.get());
                if (startingState == ExecutorState.ACTIVE) {
                    ExecutorServiceParallelExecutor.this.state.compareAndSet(ExecutorState.ACTIVE, ExecutorState.PROCESSING);
                } else if (startingState == ExecutorState.PROCESSING && noWorkOutstanding) {
                    ExecutorServiceParallelExecutor.this.state.compareAndSet(ExecutorState.PROCESSING, ExecutorState.QUIESCING);
                } else if (startingState == ExecutorState.QUIESCING && noWorkOutstanding) {
                    ExecutorServiceParallelExecutor.this.state.compareAndSet(ExecutorState.QUIESCING, ExecutorState.QUIESCENT);
                }
                this.fireTimers();
                ArrayList<ExecutorUpdate> updates = new ArrayList<ExecutorUpdate>();
                ExecutorUpdate pendingUpdate = (ExecutorUpdate)ExecutorServiceParallelExecutor.this.allUpdates.poll();
                while (pendingUpdate != null) {
                    updates.add(pendingUpdate);
                    pendingUpdate = (ExecutorUpdate)ExecutorServiceParallelExecutor.this.allUpdates.poll();
                }
                for (ExecutorUpdate update : updates) {
                    LOG.debug("Executor Update: {}", (Object)update);
                    if (update.getBundle().isPresent()) {
                        if (ExecutorState.ACTIVE == startingState || ExecutorState.PROCESSING == startingState && noWorkOutstanding) {
                            ExecutorServiceParallelExecutor.this.scheduleConsumers(update);
                            continue;
                        }
                        ExecutorServiceParallelExecutor.this.allUpdates.offer(update);
                        continue;
                    }
                    if (!update.getException().isPresent()) continue;
                    ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
                    this.exceptionThrown = true;
                }
                this.addWorkIfNecessary();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.error("Monitor died due to being interrupted");
                while (!ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) {
                    ExecutorServiceParallelExecutor.this.visibleUpdates.poll();
                }
            }
            catch (Throwable t) {
                LOG.error("Monitor thread died due to throwable", t);
                while (!ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) {
                    ExecutorServiceParallelExecutor.this.visibleUpdates.poll();
                }
            }
            finally {
                if (!this.shouldShutdown()) {
                    ExecutorServiceParallelExecutor.this.executorService.submit(this);
                }
                Thread.currentThread().setName(oldName);
            }
        }

        private void fireTimers() throws Exception {
            try {
                for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, InMemoryWatermarkManager.FiredTimers>> transformTimers : ExecutorServiceParallelExecutor.this.evaluationContext.extractFiredTimers().entrySet()) {
                    AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
                    for (Map.Entry<StructuralKey<?>, InMemoryWatermarkManager.FiredTimers> keyTimers : transformTimers.getValue().entrySet()) {
                        for (TimeDomain domain : TimeDomain.values()) {
                            Collection<TimerInternals.TimerData> delivery = keyTimers.getValue().getTimers(domain);
                            if (delivery.isEmpty()) continue;
                            KeyedWorkItem work = KeyedWorkItems.timersWorkItem(keyTimers.getKey().getKey(), delivery);
                            InProcessPipelineRunner.CommittedBundle bundle = ExecutorServiceParallelExecutor.this.evaluationContext.createKeyedBundle(null, keyTimers.getKey(), (PCollection)transform.getInput()).add(WindowedValue.valueInEmptyWindows(work)).commit(ExecutorServiceParallelExecutor.this.evaluationContext.now());
                            ExecutorServiceParallelExecutor.this.state.set(ExecutorState.ACTIVE);
                            ExecutorServiceParallelExecutor.this.scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery));
                        }
                    }
                }
            }
            catch (Exception e) {
                LOG.error("Internal Error while delivering timers", (Throwable)e);
                throw e;
            }
        }

        private boolean shouldShutdown() {
            boolean shouldShutdown;
            boolean bl = shouldShutdown = this.exceptionThrown || ExecutorServiceParallelExecutor.this.evaluationContext.isDone();
            if (shouldShutdown && ExecutorServiceParallelExecutor.this.evaluationContext.isDone()) {
                LOG.debug("Pipeline is finished. Shutting down. {}");
                while (!ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
                    ExecutorServiceParallelExecutor.this.visibleUpdates.poll();
                }
                ExecutorServiceParallelExecutor.this.executorService.shutdown();
            }
            return shouldShutdown;
        }

        private void addWorkIfNecessary() {
            if (ExecutorServiceParallelExecutor.this.state.get() == ExecutorState.QUIESCENT) {
                for (AppliedPTransform root : ExecutorServiceParallelExecutor.this.rootNodes) {
                    if (ExecutorServiceParallelExecutor.this.evaluationContext.isDone(root)) continue;
                    ExecutorServiceParallelExecutor.this.scheduleConsumption(root, null, ExecutorServiceParallelExecutor.this.defaultCompletionCallback);
                    ExecutorServiceParallelExecutor.this.state.set(ExecutorState.ACTIVE);
                }
            }
        }
    }

    private static class VisibleExecutorUpdate {
        private final Optional<? extends Throwable> throwable;
        private final boolean done;

        public static VisibleExecutorUpdate fromThrowable(Throwable e) {
            return new VisibleExecutorUpdate(false, e);
        }

        public static VisibleExecutorUpdate finished() {
            return new VisibleExecutorUpdate(true, null);
        }

        private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) {
            this.throwable = Optional.fromNullable(exception);
            this.done = done;
        }

        public boolean isDone() {
            return this.done;
        }
    }

    static abstract class ExecutorUpdate {
        ExecutorUpdate() {
        }

        public static ExecutorUpdate fromBundle(InProcessPipelineRunner.CommittedBundle<?> bundle, Collection<AppliedPTransform<?, ?, ?>> consumers) {
            return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(Optional.of(bundle), consumers, Optional.absent());
        }

        public static ExecutorUpdate fromThrowable(Throwable t) {
            return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(Optional.absent(), Collections.emptyList(), Optional.of(t));
        }

        public abstract Optional<? extends InProcessPipelineRunner.CommittedBundle<?>> getBundle();

        public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers();

        public abstract Optional<? extends Throwable> getException();
    }

    private class TimerIterableCompletionCallback
    implements CompletionCallback {
        private final Iterable<TimerInternals.TimerData> timers;

        protected TimerIterableCompletionCallback(Iterable<TimerInternals.TimerData> timers) {
            this.timers = timers;
        }

        @Override
        public final CommittedResult handleResult(InProcessPipelineRunner.CommittedBundle<?> inputBundle, InProcessTransformResult result) {
            CommittedResult committedResult = ExecutorServiceParallelExecutor.this.evaluationContext.handleResult(inputBundle, this.timers, result);
            for (InProcessPipelineRunner.CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
                ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle, (Collection)ExecutorServiceParallelExecutor.this.valueToConsumers.get(outputBundle.getPCollection())));
            }
            InProcessPipelineRunner.CommittedBundle<?> unprocessedInputs = committedResult.getUnprocessedInputs();
            if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) {
                ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromBundle(unprocessedInputs, Collections.singleton(committedResult.getTransform())));
            }
            if (!committedResult.getProducedOutputTypes().isEmpty()) {
                ExecutorServiceParallelExecutor.this.state.set(ExecutorState.ACTIVE);
            }
            ExecutorServiceParallelExecutor.this.outstandingWork.decrementAndGet();
            return committedResult;
        }

        @Override
        public void handleEmpty(AppliedPTransform<?, ?, ?> transform) {
            ExecutorServiceParallelExecutor.this.outstandingWork.decrementAndGet();
        }

        @Override
        public final void handleThrowable(InProcessPipelineRunner.CommittedBundle<?> inputBundle, Throwable t) {
            ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromThrowable(t));
            ExecutorServiceParallelExecutor.this.outstandingWork.decrementAndGet();
        }
    }
}

