/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.api.Writer;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.resources.ScalingAllocator;
import org.apache.tez.runtime.task.TaskRunner2Callable;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestLogicalIOProcessorRuntimeTask {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=5000L)
    public void testAutoStart() throws Exception {
        TezDAGID dagId = this.createTezDagId();
        TezVertexID vertexId = this.createTezVertexId(dagId);
        HashMap serviceConsumerMetadata = new HashMap();
        HashMultimap startedInputsMap = HashMultimap.create();
        TezUmbilical umbilical = (TezUmbilical)Mockito.mock(TezUmbilical.class);
        TezConfiguration tezConf = new TezConfiguration();
        tezConf.set("tez.task.scale.memory.allocator.class", ScalingAllocator.class.getName());
        TezTaskAttemptID taId1 = this.createTaskAttemptID(vertexId, 1);
        TaskSpec task1 = this.createTaskSpec(taId1, "dag1", "vertex1", 30, TestProcessor.class.getName(), TestOutput.class.getName());
        TezTaskAttemptID taId2 = this.createTaskAttemptID(vertexId, 2);
        TaskSpec task2 = this.createTaskSpec(taId2, "dag2", "vertex1", 10, TestProcessor.class.getName(), TestOutput.class.getName());
        TezSharedExecutor sharedExecutor = new TezSharedExecutor((Configuration)tezConf);
        LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, (Configuration)tezConf, null, umbilical, serviceConsumerMetadata, new HashMap(), (Multimap)startedInputsMap, null, "", (ExecutionContext)new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true, (HadoopShim)new DefaultHadoopShim(), (TezExecutors)sharedExecutor);
        try {
            lio1.initialize();
            lio1.run();
            lio1.close();
            Assert.assertEquals((long)1L, (long)TestProcessor.runCount);
            Assert.assertEquals((long)1L, (long)TestInput.startCount);
            Assert.assertEquals((long)0L, (long)TestOutput.startCount);
            Assert.assertEquals((Object)true, (Object)lio1.getAndClearProgressNotification());
            Assert.assertEquals((Object)false, (Object)lio1.getAndClearProgressNotification());
            Assert.assertEquals((long)30L, (long)TestInput.vertexParallelism);
            Assert.assertEquals((long)0L, (long)TestOutput.vertexParallelism);
            Assert.assertEquals((long)30L, (long)lio1.getProcessorContext().getVertexParallelism());
            Assert.assertEquals((long)30L, (long)((InputContext)lio1.getInputContexts().iterator().next()).getVertexParallelism());
            Assert.assertEquals((long)30L, (long)((OutputContext)lio1.getOutputContexts().iterator().next()).getVertexParallelism());
        }
        catch (Exception e) {
            Assert.fail();
            sharedExecutor.shutdownNow();
        }
        finally {
            this.cleanupAndTest(lio1);
        }
        tezConf.setBoolean("tez.local.mode", true);
        LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, (Configuration)tezConf, null, umbilical, serviceConsumerMetadata, new HashMap(), (Multimap)startedInputsMap, null, "", (ExecutionContext)new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true, (HadoopShim)new DefaultHadoopShim(), (TezExecutors)sharedExecutor);
        try {
            lio2.initialize();
            lio2.run();
            lio2.close();
            Assert.assertEquals((long)2L, (long)TestProcessor.runCount);
            Assert.assertEquals((long)1L, (long)TestInput.startCount);
            Assert.assertEquals((long)0L, (long)TestOutput.startCount);
            Assert.assertEquals((long)30L, (long)TestInput.vertexParallelism);
            Assert.assertEquals((long)0L, (long)TestOutput.vertexParallelism);
            Assert.assertEquals((long)10L, (long)lio2.getProcessorContext().getVertexParallelism());
            Assert.assertEquals((long)10L, (long)((InputContext)lio2.getInputContexts().iterator().next()).getVertexParallelism());
            Assert.assertEquals((long)10L, (long)((OutputContext)lio2.getOutputContexts().iterator().next()).getVertexParallelism());
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.cleanupAndTest(lio2);
            sharedExecutor.shutdownNow();
        }
    }

    @Test
    public void testEventsCantBeSentInCleanup() throws Exception {
        TezDAGID dagId = this.createTezDagId();
        TezVertexID vertexId = this.createTezVertexId(dagId);
        HashMap<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
        HashMultimap startedInputsMap = HashMultimap.create();
        TezUmbilical umbilical = (TezUmbilical)Mockito.mock(TezUmbilical.class);
        TezConfiguration tezConf = new TezConfiguration();
        tezConf.set("tez.task.scale.memory.allocator.class", ScalingAllocator.class.getName());
        TezTaskAttemptID taId1 = this.createTaskAttemptID(vertexId, 1);
        TaskSpec task1 = this.createTaskSpec(taId1, "dag1", "vertex1", 30, RunExceptionProcessor.class.getName(), TestOutputWithEvents.class.getName());
        TezSharedExecutor sharedExecutor = new TezSharedExecutor((Configuration)tezConf);
        CleanupLogicalIOProcessorRuntimeTask lio = new CleanupLogicalIOProcessorRuntimeTask(task1, 0, (Configuration)tezConf, null, umbilical, serviceConsumerMetadata, new HashMap<String, String>(), (Multimap<String, String>)startedInputsMap, null, "", (ExecutionContext)new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true, (HadoopShim)new DefaultHadoopShim(), (TezExecutors)sharedExecutor);
        TaskRunner2Callable runner = new TaskRunner2Callable((LogicalIOProcessorRuntimeTask)lio, UserGroupInformation.getCurrentUser(), umbilical);
        runner.call();
        ((TezUmbilical)Mockito.verify((Object)umbilical, (VerificationMode)Mockito.only())).addEvents(Collections.emptyList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExceptionHappensInClose() throws Exception {
        TezDAGID dagId = this.createTezDagId();
        TezVertexID vertexId = this.createTezVertexId(dagId);
        HashMap serviceConsumerMetadata = new HashMap();
        HashMultimap startedInputsMap = HashMultimap.create();
        TezUmbilical umbilical = (TezUmbilical)Mockito.mock(TezUmbilical.class);
        TezConfiguration tezConf = new TezConfiguration();
        tezConf.set("tez.task.scale.memory.allocator.class", ScalingAllocator.class.getName());
        TezTaskAttemptID taId1 = this.createTaskAttemptID(vertexId, 1);
        TaskSpec task1 = this.createTaskSpec(taId1, "dag1", "vertex1", 30, CloseExceptionProcessor.class.getName(), TestOutputWithEvents.class.getName());
        TezSharedExecutor sharedExecutor = new TezSharedExecutor((Configuration)tezConf);
        LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, (Configuration)tezConf, null, umbilical, serviceConsumerMetadata, new HashMap(), (Multimap)startedInputsMap, null, "", (ExecutionContext)new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true, (HadoopShim)new DefaultHadoopShim(), (TezExecutors)sharedExecutor);
        try {
            lio1.initialize();
            lio1.run();
            try {
                lio1.close();
                Assert.fail((String)"RuntimeException should have been thrown");
            }
            catch (RuntimeException e) {
                ((TezUmbilical)Mockito.verify((Object)umbilical, (VerificationMode)Mockito.never())).addEvents((Collection)Mockito.anyList());
            }
        }
        finally {
            sharedExecutor.shutdownNow();
            this.cleanupAndTest(lio1);
        }
    }

    private void cleanupAndTest(LogicalIOProcessorRuntimeTask lio) throws InterruptedException {
        ProcessorContext procContext = lio.getProcessorContext();
        LinkedList inputContexts = new LinkedList();
        inputContexts.addAll(lio.getInputContexts());
        LinkedList outputContexts = new LinkedList();
        outputContexts.addAll(lio.getOutputContexts());
        lio.cleanup();
        Assert.assertTrue((procContext.getUserPayload() == null ? 1 : 0) != 0);
        Assert.assertTrue((procContext.getObjectRegistry() == null ? 1 : 0) != 0);
        for (InputContext inputContext : inputContexts) {
            Assert.assertTrue((inputContext.getUserPayload() == null ? 1 : 0) != 0);
            Assert.assertTrue((inputContext.getObjectRegistry() == null ? 1 : 0) != 0);
        }
        for (OutputContext outputContext : outputContexts) {
            Assert.assertTrue((outputContext.getUserPayload() == null ? 1 : 0) != 0);
            Assert.assertTrue((outputContext.getObjectRegistry() == null ? 1 : 0) != 0);
        }
        boolean localMode = lio.tezConf.getBoolean("tez.local.mode", false);
        if (localMode) {
            Assert.assertEquals((long)1L, (long)lio.inputSpecs.size());
            Assert.assertEquals((long)1L, (long)lio.outputSpecs.size());
            Assert.assertTrue((lio.groupInputSpecs == null || lio.groupInputSpecs.size() == 0 ? 1 : 0) != 0);
        } else {
            Assert.assertEquals((long)0L, (long)lio.inputSpecs.size());
            Assert.assertEquals((long)0L, (long)lio.outputSpecs.size());
            Assert.assertTrue((lio.groupInputSpecs == null || lio.groupInputSpecs.size() == 0 ? 1 : 0) != 0);
        }
        Assert.assertEquals((long)0L, (long)lio.inputsMap.size());
        Assert.assertEquals((long)0L, (long)lio.inputContextMap.size());
        Assert.assertEquals((long)0L, (long)lio.outputsMap.size());
        Assert.assertEquals((long)0L, (long)lio.outputContextMap.size());
        Assert.assertNull((Object)lio.groupInputsMap);
        Assert.assertNull((Object)lio.processor);
        Assert.assertNull((Object)lio.processorContext);
        Assert.assertEquals((long)0L, (long)lio.runInputMap.size());
        Assert.assertEquals((long)0L, (long)lio.runOutputMap.size());
        Assert.assertEquals((long)0L, (long)lio.eventsToBeProcessed.size());
        Assert.assertNull((Object)lio.eventRouterThread);
    }

    private TaskSpec createTaskSpec(TezTaskAttemptID taskAttemptID, String dagName, String vertexName, int parallelism, String processorClassname, String outputClassName) {
        ProcessorDescriptor processorDesc = this.createProcessorDescriptor(processorClassname);
        TaskSpec taskSpec = new TaskSpec(taskAttemptID, dagName, vertexName, parallelism, processorDesc, this.createInputSpecList(), this.createOutputSpecList(outputClassName), null, null);
        return taskSpec;
    }

    private List<InputSpec> createInputSpecList() {
        InputDescriptor inputDesc = InputDescriptor.create((String)TestInput.class.getName());
        InputSpec inputSpec = new InputSpec("inedge", inputDesc, 1);
        return Lists.newArrayList((Object[])new InputSpec[]{inputSpec});
    }

    private List<OutputSpec> createOutputSpecList(String outputClassName) {
        OutputDescriptor outputtDesc = OutputDescriptor.create((String)outputClassName);
        OutputSpec outputSpec = new OutputSpec("outedge", outputtDesc, 1);
        return Lists.newArrayList((Object[])new OutputSpec[]{outputSpec});
    }

    private ProcessorDescriptor createProcessorDescriptor(String className) {
        ProcessorDescriptor desc = ProcessorDescriptor.create((String)className);
        return desc;
    }

    private TezTaskAttemptID createTaskAttemptID(TezVertexID vertexId, int taskIndex) {
        TezTaskID taskId = TezTaskID.getInstance((TezVertexID)vertexId, (int)taskIndex);
        TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance((TezTaskID)taskId, (int)taskIndex);
        return taskAttemptId;
    }

    private TezVertexID createTezVertexId(TezDAGID dagId) {
        return TezVertexID.getInstance((TezDAGID)dagId, (int)1);
    }

    private TezDAGID createTezDagId() {
        return TezDAGID.getInstance((String)"2000", (int)100, (int)1);
    }

    public static class TestProcessor
    extends AbstractLogicalIOProcessor {
        public static volatile int runCount = 0;

        public TestProcessor(ProcessorContext context) {
            super(context);
        }

        public void initialize() throws Exception {
        }

        public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
            ++runCount;
            this.getContext().notifyProgress();
        }

        public void handleEvents(List<Event> processorEvents) {
        }

        public void close() throws Exception {
        }
    }

    public static class TestOutput
    extends AbstractLogicalOutput {
        public static volatile int startCount = 0;
        public static volatile int vertexParallelism;

        public TestOutput(OutputContext outputContext, int numPhysicalOutputs) {
            super(outputContext, numPhysicalOutputs);
        }

        public List<Event> initialize() throws Exception {
            this.getContext().requestInitialMemory(0L, null);
            return null;
        }

        public void start() throws Exception {
            System.err.println("Out started");
            ++startCount;
            vertexParallelism = this.getContext().getVertexParallelism();
            this.getContext().notifyProgress();
        }

        public Writer getWriter() throws Exception {
            return null;
        }

        public void handleEvents(List<Event> outputEvents) {
        }

        public List<Event> close() throws Exception {
            return null;
        }
    }

    public static class TestInput
    extends AbstractLogicalInput {
        public static volatile int startCount = 0;
        public static volatile int vertexParallelism;

        public TestInput(InputContext inputContext, int numPhysicalInputs) {
            super(inputContext, numPhysicalInputs);
        }

        public List<Event> initialize() throws Exception {
            this.getContext().requestInitialMemory(0L, null);
            this.getContext().inputIsReady();
            return null;
        }

        public void start() throws Exception {
            ++startCount;
            vertexParallelism = this.getContext().getVertexParallelism();
            this.getContext().notifyProgress();
        }

        public Reader getReader() throws Exception {
            return null;
        }

        public void handleEvents(List<Event> inputEvents) throws Exception {
        }

        public List<Event> close() throws Exception {
            return null;
        }
    }

    public static class RunExceptionProcessor
    extends TestProcessor {
        public RunExceptionProcessor(ProcessorContext context) {
            super(context);
        }

        @Override
        public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
            throw new RuntimeException();
        }

        @Override
        public void close() throws Exception {
            throw new RuntimeException();
        }
    }

    public static class TestOutputWithEvents
    extends TestOutput {
        public static volatile int startCount = 0;
        public static volatile int vertexParallelism;

        public TestOutputWithEvents(OutputContext outputContext, int numPhysicalOutputs) {
            super(outputContext, numPhysicalOutputs);
        }

        @Override
        public List<Event> close() throws Exception {
            return Arrays.asList(CompositeDataMovementEvent.create((int)0, (int)0, null));
        }
    }

    private static class CleanupLogicalIOProcessorRuntimeTask
    extends LogicalIOProcessorRuntimeTask {
        CleanupLogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap, Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry, String pid, ExecutionContext ExecutionContext2, long memAvailable, boolean updateSysCounters, HadoopShim hadoopShim, TezExecutors sharedExecutor) throws IOException {
            super(taskSpec, appAttemptNumber, tezConf, localDirs, tezUmbilical, serviceConsumerMetadata, envMap, startedInputsMap, objectRegistry, pid, ExecutionContext2, memAvailable, updateSysCounters, hadoopShim, sharedExecutor);
        }

        public void cleanup() throws InterruptedException {
            this.getOutputContexts().forEach(context -> context.sendEvents(Arrays.asList(CompositeDataMovementEvent.create((int)0, (int)0, null))));
        }
    }

    public static class CloseExceptionProcessor
    extends TestProcessor {
        public CloseExceptionProcessor(ProcessorContext context) {
            super(context);
        }

        @Override
        public void close() throws Exception {
            throw new RuntimeException();
        }
    }
}

