package org.apache.tez.dag.app;

import com.google.common.base.Joiner;
import java.io.IOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.MockDAGAppMaster;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
import org.apache.tez.dag.app.dag.speculation.legacy.LegacyTaskRuntimeEstimator;
import org.apache.tez.dag.app.dag.speculation.legacy.SimpleExponentialTaskRuntimeEstimator;
import org.apache.tez.dag.app.dag.speculation.legacy.TaskRuntimeEstimator;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/tez/dag/app/TestSpeculation.class */
public class TestSpeculation {
    private static final Logger LOG = LoggerFactory.getLogger(TezConfiguration.class);
    private static final String ASSERT_SPECULATIONS_COUNT_MSG = "Number of attempts after Speculation should be two";
    private static final String UNIT_EXCEPTION_MESSAGE = "test timed out after";
    private static final int NUM_UPDATES_FOR_TEST_TASK = 1200;
    private static final int ASSERT_SPECULATIONS_COUNT_RETRIES = 3;
    private Configuration defaultConf;
    private FileSystem localFs;
    MockDAGAppMaster mockApp;
    MockDAGAppMaster.MockContainerLauncher mockLauncher;

    @Rule
    public RetryRule rule = new RetryRule(ASSERT_SPECULATIONS_COUNT_RETRIES);
    private Class<? extends TaskRuntimeEstimator> estimatorClass;

    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:org/apache/tez/dag/app/TestSpeculation$Retry.class */
    public @interface Retry {
    }

    /* loaded from: input_file:org/apache/tez/dag/app/TestSpeculation$RetryRule.class */
    class RetryRule implements TestRule {
        private AtomicInteger retryCount;

        RetryRule(int i) {
            this.retryCount = new AtomicInteger(i);
        }

        public Statement apply(final Statement statement, final Description description) {
            return new Statement() { // from class: org.apache.tez.dag.app.TestSpeculation.RetryRule.1
                public void evaluate() throws Throwable {
                    while (RetryRule.this.retryCount.getAndDecrement() > 0) {
                        try {
                            statement.evaluate();
                            return;
                        } catch (Throwable th) {
                            if (RetryRule.this.retryCount.get() <= 0 || description.getAnnotation(Retry.class) == null) {
                                throw th;
                            }
                            if ((th instanceof AssertionError) && th.getMessage().contains(TestSpeculation.ASSERT_SPECULATIONS_COUNT_MSG)) {
                                continue;
                            } else if (!(th instanceof Exception) || !th.getMessage().contains(TestSpeculation.UNIT_EXCEPTION_MESSAGE)) {
                                throw th;
                            }
                            TestSpeculation.LOG.warn("{} : Failed. Retries remaining: ", description.getDisplayName(), RetryRule.this.retryCount.toString());
                        }
                    }
                }
            };
        }
    }

    @Before
    public void setDefaultConf() {
        try {
            this.defaultConf = new Configuration(false);
            this.defaultConf.set("fs.defaultFS", "file:///");
            this.defaultConf.setBoolean("tez.local.mode", true);
            this.defaultConf.setBoolean("tez.am.speculation.enabled", true);
            this.defaultConf.setFloat("tez.shuffle-vertex-manager.min-src-fraction", 1.0f);
            this.defaultConf.setFloat("tez.shuffle-vertex-manager.max-src-fraction", 1.0f);
            this.localFs = FileSystem.getLocal(this.defaultConf);
            this.defaultConf.set("tez.staging-dir", "target/" + TestSpeculation.class.getName() + "-tmpDir");
            this.defaultConf.setClass("tez.am.task.estimator.class", this.estimatorClass, TaskRuntimeEstimator.class);
            this.defaultConf.setInt("tez.am.minimum.allowed.speculative.tasks", 20);
            this.defaultConf.setDouble("tez.am.proportion.total.tasks.speculatable", 0.2d);
            this.defaultConf.setDouble("tez.am.proportion.running.tasks.speculatable", 0.25d);
            this.defaultConf.setLong("tez.am.soonest.retry.after.no.speculate", 25L);
            this.defaultConf.setLong("tez.am.soonest.retry.after.speculate", 50L);
            this.defaultConf.setInt("tez.am.task.estimator.exponential.skip.initials", 2);
        } catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }

    @After
    public void tearDown() {
        this.defaultConf = null;
        try {
            this.localFs.close();
            this.mockLauncher.shutdown();
            this.mockApp.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})")
    public static Collection<Object[]> getTestParameters() {
        return Arrays.asList(new Object[]{SimpleExponentialTaskRuntimeEstimator.class}, new Object[]{LegacyTaskRuntimeEstimator.class});
    }

    public TestSpeculation(Class<? extends TaskRuntimeEstimator> cls) {
        this.estimatorClass = cls;
    }

    MockTezClient createTezSession() throws Exception {
        TezConfiguration tezConfiguration = new TezConfiguration(this.defaultConf);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MockTezClient mockTezClient = new MockTezClient("testspeculation", tezConfiguration, true, null, null, new MockClock(), atomicBoolean, false, false, 1, 2);
        mockTezClient.start();
        syncWithMockAppLauncher(false, atomicBoolean, mockTezClient);
        return mockTezClient;
    }

    void syncWithMockAppLauncher(boolean z, AtomicBoolean atomicBoolean, MockTezClient mockTezClient) throws Exception {
        synchronized (atomicBoolean) {
            while (!atomicBoolean.get()) {
                atomicBoolean.wait();
            }
            this.mockApp = mockTezClient.getLocalClient().getMockApp();
            this.mockLauncher = this.mockApp.getContainerLauncher();
            this.mockLauncher.startScheduling(z);
            atomicBoolean.notify();
        }
    }

    @Test(timeout = 30000)
    @Retry
    public void testSingleTaskSpeculation() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(4611686018427387903L, 1);
        hashMap.put(100L, 2);
        hashMap.put(-1L, 1);
        this.defaultConf.setLong("tez.am.soonest.retry.after.no.speculate", 50L);
        for (Map.Entry entry : hashMap.entrySet()) {
            this.defaultConf.setLong("tez.am.legacy.speculative.single.task.vertex.timeout", ((Long) entry.getKey()).longValue());
            DAG create = DAG.create("test");
            create.addVertex(Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1));
            MockTezClient createTezSession = createTezSession();
            DAGClient submitDAG = createTezSession.submitDAG(create);
            DAGImpl currentDAG = this.mockApp.getContext().getCurrentDAG();
            TezVertexID tezVertexID = TezVertexID.getInstance(currentDAG.getID(), 0);
            TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 0);
            TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 1);
            Thread.sleep(200L);
            this.mockLauncher.setStatusUpdatesForTask(tezTaskAttemptID, NUM_UPDATES_FOR_TEST_TASK);
            this.mockLauncher.startScheduling(true);
            submitDAG.waitForCompletion();
            Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
            TaskImpl task = currentDAG.getTask(tezTaskAttemptID.getTaskID());
            Assert.assertEquals(((Integer) entry.getValue()).intValue(), task.getAttempts().size());
            if (((Integer) entry.getValue()).intValue() > 1) {
                Assert.assertEquals(tezTaskAttemptID2, task.getSuccessfulAttempt().getID());
                TaskAttempt attempt = task.getAttempt(tezTaskAttemptID);
                Joiner.on(",").join(attempt.getDiagnostics()).contains("Killed as speculative attempt");
                Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, attempt.getTerminationCause());
            }
            createTezSession.stop();
        }
    }

    public void testBasicSpeculation(boolean z) throws Exception {
        DAG create = DAG.create("test");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
        create.addVertex(create2);
        MockTezClient createTezSession = createTezSession();
        DAGClient submitDAG = createTezSession.submitDAG(create);
        DAGImpl currentDAG = this.mockApp.getContext().getCurrentDAG();
        TezVertexID tezVertexID = TezVertexID.getInstance(currentDAG.getID(), 0);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 1);
        this.mockLauncher.updateProgress(z);
        this.mockLauncher.setStatusUpdatesForTask(tezTaskAttemptID, NUM_UPDATES_FOR_TEST_TASK);
        this.mockLauncher.startScheduling(true);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        TaskImpl task = currentDAG.getTask(tezTaskAttemptID.getTaskID());
        Assert.assertEquals(ASSERT_SPECULATIONS_COUNT_MSG, 2L, task.getAttempts().size());
        Assert.assertEquals(tezTaskAttemptID2, task.getSuccessfulAttempt().getID());
        TaskAttempt attempt = task.getAttempt(tezTaskAttemptID);
        Joiner.on(",").join(attempt.getDiagnostics()).contains("Killed as speculative attempt");
        Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, attempt.getTerminationCause());
        if (z) {
            Assert.assertEquals(1L, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
            Assert.assertEquals(1L, currentDAG.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
            Assert.assertEquals(1L, currentDAG.getVertex(tezTaskAttemptID.getTaskID().getVertexID()).getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
        }
        LegacySpeculator speculator = currentDAG.getVertex(create2.getName()).getSpeculator();
        Assert.assertEquals(20L, speculator.getMinimumAllowedSpeculativeTasks());
        Assert.assertEquals(0.2d, speculator.getProportionTotalTasksSpeculatable(), 0.0d);
        Assert.assertEquals(0.25d, speculator.getProportionRunningTasksSpeculatable(), 0.0d);
        Assert.assertEquals(25L, speculator.getSoonestRetryAfterNoSpeculate());
        Assert.assertEquals(50L, speculator.getSoonestRetryAfterSpeculate());
        createTezSession.stop();
    }

    @Test(timeout = 30000)
    @Retry
    public void testBasicSpeculationWithProgress() throws Exception {
        testBasicSpeculation(true);
    }

    @Test(timeout = 30000)
    @Retry
    public void testBasicSpeculationWithoutProgress() throws Exception {
        testBasicSpeculation(false);
    }

    @Test(timeout = 30000)
    @Retry
    public void testBasicSpeculationPerVertexConf() throws Exception {
        DAG create = DAG.create("test");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
        Vertex create3 = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 5);
        create2.setConf("tez.am.speculation.enabled", "false");
        create.addVertex(create2);
        create.addVertex(create3);
        create.addEdge(Edge.create(create2, create3, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("O"), InputDescriptor.create("I"))));
        MockTezClient createTezSession = createTezSession();
        DAGClient submitDAG = createTezSession.submitDAG(create);
        DAGImpl currentDAG = this.mockApp.getContext().getCurrentDAG();
        TezVertexID vertexId = currentDAG.getVertex("B").getVertexId();
        TezVertexID vertexId2 = currentDAG.getVertex("A").getVertexId();
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId2, 0), 0);
        this.mockLauncher.setStatusUpdatesForTask(tezTaskAttemptID, NUM_UPDATES_FOR_TEST_TASK);
        this.mockLauncher.setStatusUpdatesForTask(tezTaskAttemptID2, NUM_UPDATES_FOR_TEST_TASK);
        this.mockLauncher.startScheduling(true);
        org.apache.tez.dag.app.dag.Vertex vertex = currentDAG.getVertex(vertexId);
        org.apache.tez.dag.app.dag.Vertex vertex2 = currentDAG.getVertex(vertexId2);
        do {
            Thread.sleep(100L);
        } while (vertex.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue() <= 0);
        submitDAG.waitForCompletion();
        Assert.assertTrue("Num Speculations is not higher than 0", vertex.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue() > 0);
        Assert.assertEquals(0L, vertex2.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
        createTezSession.stop();
    }

    @Test(timeout = 30000)
    @Retry
    public void testBasicSpeculationNotUseful() throws Exception {
        DAG create = DAG.create("test");
        create.addVertex(Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5));
        MockTezClient createTezSession = createTezSession();
        DAGClient submitDAG = createTezSession.submitDAG(create);
        DAGImpl currentDAG = this.mockApp.getContext().getCurrentDAG();
        TezVertexID tezVertexID = TezVertexID.getInstance(currentDAG.getID(), 0);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 1);
        this.mockLauncher.setStatusUpdatesForTask(tezTaskAttemptID, NUM_UPDATES_FOR_TEST_TASK);
        this.mockLauncher.setStatusUpdatesForTask(tezTaskAttemptID2, NUM_UPDATES_FOR_TEST_TASK);
        this.mockLauncher.startScheduling(true);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        TaskImpl task = currentDAG.getTask(tezTaskAttemptID2.getTaskID());
        Assert.assertEquals(2L, task.getAttempts().size());
        Assert.assertEquals(tezTaskAttemptID, task.getSuccessfulAttempt().getID());
        TaskAttempt attempt = task.getAttempt(tezTaskAttemptID2);
        Joiner.on(",").join(attempt.getDiagnostics()).contains("Killed speculative attempt as");
        Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION, attempt.getTerminationCause());
        Assert.assertEquals(1L, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
        Assert.assertEquals(1L, currentDAG.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
        Assert.assertEquals(1L, currentDAG.getVertex(tezTaskAttemptID2.getTaskID().getVertexID()).getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
        createTezSession.stop();
    }
}
