/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestStateChangeNotifier {
    @Test(timeout=5000L)
    public void testEventsOnRegistration() {
        TezDAGID dagId = TezDAGID.getInstance((String)"1", (int)1, (int)1);
        Vertex v1 = this.createMockVertex(dagId, 1);
        Vertex v2 = this.createMockVertex(dagId, 2);
        Vertex v3 = this.createMockVertex(dagId, 3);
        DAG dag = this.createMockDag(dagId, v1, v2, v3);
        StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag);
        this.notifyTracker(tracker, v1, VertexState.RUNNING);
        VertexStateUpdateListener mockListener11 = (VertexStateUpdateListener)Mockito.mock(VertexStateUpdateListener.class);
        VertexStateUpdateListener mockListener12 = (VertexStateUpdateListener)Mockito.mock(VertexStateUpdateListener.class);
        VertexStateUpdateListener mockListener13 = (VertexStateUpdateListener)Mockito.mock(VertexStateUpdateListener.class);
        VertexStateUpdateListener mockListener14 = (VertexStateUpdateListener)Mockito.mock(VertexStateUpdateListener.class);
        tracker.registerForVertexUpdates(v1.getName(), null, mockListener11);
        tracker.registerForVertexUpdates(v1.getName(), EnumSet.allOf(VertexState.class), mockListener12);
        tracker.registerForVertexUpdates(v1.getName(), EnumSet.of(VertexState.RUNNING), mockListener13);
        tracker.registerForVertexUpdates(v1.getName(), EnumSet.of(VertexState.SUCCEEDED), mockListener14);
        ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(VertexStateUpdate.class);
        ((VertexStateUpdateListener)Mockito.verify((Object)mockListener11, (VerificationMode)Mockito.times((int)1))).onStateUpdated((VertexStateUpdate)argumentCaptor.capture());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)((VertexStateUpdate)argumentCaptor.getValue()).getVertexState());
        ((VertexStateUpdateListener)Mockito.verify((Object)mockListener12, (VerificationMode)Mockito.times((int)1))).onStateUpdated((VertexStateUpdate)argumentCaptor.capture());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)((VertexStateUpdate)argumentCaptor.getValue()).getVertexState());
        ((VertexStateUpdateListener)Mockito.verify((Object)mockListener13, (VerificationMode)Mockito.times((int)1))).onStateUpdated((VertexStateUpdate)argumentCaptor.capture());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)((VertexStateUpdate)argumentCaptor.getValue()).getVertexState());
        ((VertexStateUpdateListener)Mockito.verify((Object)mockListener14, (VerificationMode)Mockito.never())).onStateUpdated((VertexStateUpdate)Mockito.any());
        tracker.reset();
        VertexStateUpdateListener mockListener2 = (VertexStateUpdateListener)Mockito.mock(VertexStateUpdateListener.class);
        tracker.registerForVertexUpdates(v2.getName(), null, mockListener2);
        Assert.assertEquals((long)0L, (long)tracker.totalCount.get());
        ((VertexStateUpdateListener)Mockito.verify((Object)mockListener2, (VerificationMode)Mockito.never())).onStateUpdated((VertexStateUpdate)Mockito.any());
        tracker.stateChanged(v3.getVertexId(), (VertexStateUpdate)new VertexStateUpdateParallelismUpdated(v3.getName(), 23, -1));
        VertexStateUpdateListener mockListener3 = (VertexStateUpdateListener)Mockito.mock(VertexStateUpdateListener.class);
        tracker.registerForVertexUpdates(v3.getName(), null, mockListener3);
        ((VertexStateUpdateListener)Mockito.verify((Object)mockListener3, (VerificationMode)Mockito.times((int)1))).onStateUpdated((VertexStateUpdate)argumentCaptor.capture());
        Assert.assertEquals((Object)VertexState.PARALLELISM_UPDATED, (Object)((VertexStateUpdate)argumentCaptor.getValue()).getVertexState());
    }

    @Test(timeout=5000L)
    public void testSimpleStateUpdates() {
        TezDAGID dagId = TezDAGID.getInstance((String)"1", (int)1, (int)1);
        Vertex v1 = this.createMockVertex(dagId, 1);
        DAG dag = this.createMockDag(dagId, v1);
        StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag);
        VertexStateUpdateListener mockListener = (VertexStateUpdateListener)Mockito.mock(VertexStateUpdateListener.class);
        tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
        ArrayList expectedStates = Lists.newArrayList((Object[])new VertexState[]{VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.RUNNING, VertexState.SUCCEEDED});
        for (VertexState state : expectedStates) {
            this.notifyTracker(tracker, v1, state);
        }
        ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(VertexStateUpdate.class);
        ((VertexStateUpdateListener)Mockito.verify((Object)mockListener, (VerificationMode)Mockito.times((int)expectedStates.size()))).onStateUpdated((VertexStateUpdate)argumentCaptor.capture());
        List stateUpdatesSent = argumentCaptor.getAllValues();
        Iterator expectedStateIter = expectedStates.iterator();
        for (int i = 0; i < expectedStates.size(); ++i) {
            Assert.assertEquals(expectedStateIter.next(), (Object)((VertexStateUpdate)stateUpdatesSent.get(i)).getVertexState());
        }
    }

    @Test(timeout=5000L)
    public void testDuplicateRegistration() {
        TezDAGID dagId = TezDAGID.getInstance((String)"1", (int)1, (int)1);
        Vertex v1 = this.createMockVertex(dagId, 1);
        DAG dag = this.createMockDag(dagId, v1);
        StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag);
        VertexStateUpdateListener mockListener = (VertexStateUpdateListener)Mockito.mock(VertexStateUpdateListener.class);
        tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
        try {
            tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
            Assert.fail((String)"Expecting an error from duplicate registrations of the same listener");
        }
        catch (TezUncheckedException tezUncheckedException) {
            // empty catch block
        }
    }

    @Test(timeout=5000L)
    public void testSpecificStateUpdates() {
        TezDAGID dagId = TezDAGID.getInstance((String)"1", (int)1, (int)1);
        Vertex v1 = this.createMockVertex(dagId, 1);
        DAG dag = this.createMockDag(dagId, v1);
        StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag);
        VertexStateUpdateListener mockListener = (VertexStateUpdateListener)Mockito.mock(VertexStateUpdateListener.class);
        tracker.registerForVertexUpdates(v1.getName(), EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED), mockListener);
        ArrayList states = Lists.newArrayList((Object[])new VertexState[]{VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.RUNNING, VertexState.SUCCEEDED});
        ArrayList expectedStates = Lists.newArrayList((Object[])new VertexState[]{VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.RUNNING, VertexState.SUCCEEDED});
        for (VertexState state : states) {
            this.notifyTracker(tracker, v1, state);
        }
        ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(VertexStateUpdate.class);
        ((VertexStateUpdateListener)Mockito.verify((Object)mockListener, (VerificationMode)Mockito.times((int)expectedStates.size()))).onStateUpdated((VertexStateUpdate)argumentCaptor.capture());
        List stateUpdatesSent = argumentCaptor.getAllValues();
        Iterator expectedStateIter = expectedStates.iterator();
        for (int i = 0; i < expectedStates.size(); ++i) {
            Assert.assertEquals(expectedStateIter.next(), (Object)((VertexStateUpdate)stateUpdatesSent.get(i)).getVertexState());
        }
    }

    @Test(timeout=5000L)
    public void testUnregister() {
        TezDAGID dagId = TezDAGID.getInstance((String)"1", (int)1, (int)1);
        Vertex v1 = this.createMockVertex(dagId, 1);
        DAG dag = this.createMockDag(dagId, v1);
        StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag);
        VertexStateUpdateListener mockListener = (VertexStateUpdateListener)Mockito.mock(VertexStateUpdateListener.class);
        tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
        ArrayList expectedStates = Lists.newArrayList((Object[])new VertexState[]{VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.RUNNING, VertexState.SUCCEEDED});
        int count = 0;
        int numExpectedEvents = 3;
        for (VertexState state : expectedStates) {
            if (count == numExpectedEvents) {
                tracker.unregisterForVertexUpdates(v1.getName(), mockListener);
            }
            this.notifyTracker(tracker, v1, state);
            ++count;
        }
        ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(VertexStateUpdate.class);
        ((VertexStateUpdateListener)Mockito.verify((Object)mockListener, (VerificationMode)Mockito.times((int)numExpectedEvents))).onStateUpdated((VertexStateUpdate)argumentCaptor.capture());
        List stateUpdatesSent = argumentCaptor.getAllValues();
        Iterator expectedStateIter = expectedStates.iterator();
        for (int i = 0; i < numExpectedEvents; ++i) {
            Assert.assertEquals(expectedStateIter.next(), (Object)((VertexStateUpdate)stateUpdatesSent.get(i)).getVertexState());
        }
    }

    private DAG createMockDag(TezDAGID dagId, Vertex ... vertices) {
        DAG dag = (DAG)Mockito.mock(DAG.class);
        ((DAG)Mockito.doReturn((Object)dagId).when((Object)dag)).getID();
        for (Vertex v : vertices) {
            String vertexName = v.getName();
            TezVertexID vertexId = v.getVertexId();
            ((DAG)Mockito.doReturn((Object)v).when((Object)dag)).getVertex(vertexName);
            ((DAG)Mockito.doReturn((Object)v).when((Object)dag)).getVertex(vertexId);
        }
        return dag;
    }

    private Vertex createMockVertex(TezDAGID dagId, int id) {
        TezVertexID vertexId = TezVertexID.getInstance((TezDAGID)dagId, (int)id);
        String vertexName = "vertex" + id;
        Vertex v = (Vertex)Mockito.mock(Vertex.class);
        ((Vertex)Mockito.doReturn((Object)vertexId).when((Object)v)).getVertexId();
        ((Vertex)Mockito.doReturn((Object)vertexName).when((Object)v)).getName();
        return v;
    }

    private void notifyTracker(StateChangeNotifier notifier, Vertex v, VertexState state) {
        notifier.stateChanged(v.getVertexId(), new VertexStateUpdate(v.getName(), state));
    }

    public static class StateChangeNotifierForTest
    extends StateChangeNotifier {
        private static final Logger LOG = LoggerFactory.getLogger(StateChangeNotifierForTest.class);
        AtomicInteger count = new AtomicInteger(0);
        AtomicInteger totalCount = new AtomicInteger(0);

        public StateChangeNotifierForTest(DAG dag) {
            super(dag);
        }

        public void reset() {
            this.count.set(0);
            this.totalCount.set(0);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void processedEventFromQueue() {
            while (this.count.get() <= 0) {
                try {
                    Thread.sleep(10L);
                    LOG.info("sleep to wait for available events");
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            AtomicInteger atomicInteger = this.count;
            synchronized (atomicInteger) {
                if (this.count.decrementAndGet() == 0) {
                    this.count.notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void addedEventToQueue() {
            this.totalCount.incrementAndGet();
            AtomicInteger atomicInteger = this.count;
            synchronized (atomicInteger) {
                if (this.count.incrementAndGet() > 0) {
                    try {
                        this.count.wait();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

