/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.tools.fedbalance.procedure;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJournal;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJournalInfoHDFS;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
import org.apache.hadoop.tools.fedbalance.procedure.MultiPhaseProcedure;
import org.apache.hadoop.tools.fedbalance.procedure.RecordProcedure;
import org.apache.hadoop.tools.fedbalance.procedure.RetryProcedure;
import org.apache.hadoop.tools.fedbalance.procedure.UnrecoverableProcedure;
import org.apache.hadoop.tools.fedbalance.procedure.WaitProcedure;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class TestBalanceProcedureScheduler {
    private static MiniDFSCluster cluster;
    private static final Configuration CONF;
    private static DistributedFileSystem fs;
    private static final int DEFAULT_BLOCK_SIZE = 512;

    @BeforeAll
    public static void setup() throws IOException {
        CONF.setBoolean("dfs.namenode.delegation.token.always-use", true);
        CONF.set("fs.defaultFS", "hdfs:///");
        CONF.setBoolean("dfs.namenode.acls.enabled", true);
        CONF.setLong("dfs.blocksize", 512L);
        CONF.setLong("dfs.namenode.fs-limits.min-block-size", 0L);
        CONF.setInt("hdfs.fedbalance.procedure.work.thread.num", 1);
        cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
        cluster.waitClusterUp();
        cluster.waitActive();
        fs = cluster.getFileSystem();
        String workPath = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure";
        CONF.set("hdfs.fedbalance.procedure.scheduler.journal.uri", workPath);
        fs.mkdirs(new Path(workPath));
    }

    @AfterAll
    public static void close() {
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    @Test
    @Timeout(value=60L)
    public void testShutdownScheduler() throws Exception {
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
        scheduler.init(true);
        BalanceJob.Builder builder = new BalanceJob.Builder();
        builder.nextProcedure((BalanceProcedure)new WaitProcedure("wait", 1000L, 5000L));
        BalanceJob job = builder.build();
        scheduler.submit(job);
        Thread.sleep(1000L);
        scheduler.shutDownAndWait(30000);
        BalanceJournal journal = (BalanceJournal)ReflectionUtils.newInstance(BalanceJournalInfoHDFS.class, (Configuration)CONF);
        journal.clear(job);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testSuccessfulJob() throws Exception {
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
        scheduler.init(true);
        try {
            ArrayList<RecordProcedure> procedures = new ArrayList<RecordProcedure>();
            BalanceJob.Builder builder = new BalanceJob.Builder();
            for (int i = 0; i < 5; ++i) {
                RecordProcedure r = new RecordProcedure("record-" + i, 1000L);
                builder.nextProcedure((BalanceProcedure)r);
                procedures.add(r);
            }
            BalanceJob job = builder.build();
            scheduler.submit(job);
            scheduler.waitUntilDone(job);
            Assertions.assertNull((Object)job.getError());
            Assertions.assertEquals((int)5, (int)RecordProcedure.getFinishList().size());
            for (int i = 0; i < RecordProcedure.getFinishList().size(); ++i) {
                Assertions.assertEquals(procedures.get(i), (Object)((Object)RecordProcedure.getFinishList().get(i)));
            }
        }
        finally {
            scheduler.shutDownAndWait(2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testFailedJob() throws Exception {
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
        scheduler.init(true);
        try {
            BalanceProcedure badProcedure = (BalanceProcedure)Mockito.mock(BalanceProcedure.class);
            ((BalanceProcedure)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("Job failed exception.")}).when((Object)badProcedure)).execute();
            ((BalanceProcedure)Mockito.doReturn((Object)"bad-procedure").when((Object)badProcedure)).name();
            BalanceJob.Builder builder = new BalanceJob.Builder();
            builder.nextProcedure(badProcedure);
            BalanceJob job = builder.build();
            scheduler.submit(job);
            scheduler.waitUntilDone(job);
            GenericTestUtils.assertExceptionContains((String)"Job failed exception", (Throwable)job.getError());
        }
        finally {
            scheduler.shutDownAndWait(2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testGetJobAfterRecover() throws Exception {
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
        scheduler.init(true);
        try {
            int i;
            BalanceJob.Builder builder = new BalanceJob.Builder();
            String firstProcedure = "wait0";
            WaitProcedure[] procedures = new WaitProcedure[5];
            for (int i2 = 0; i2 < 5; ++i2) {
                WaitProcedure procedure = new WaitProcedure("wait" + i2, 1000L, 1000L);
                builder.nextProcedure((BalanceProcedure)procedure).removeAfterDone(false);
                procedures[i2] = procedure;
            }
            BalanceJob job = builder.build();
            scheduler.submit(job);
            long randomSleepTime = Math.abs(new Random().nextInt()) % 5 * 1000 + 1000;
            Thread.sleep(randomSleepTime);
            scheduler.shutDownAndWait(2);
            WaitProcedure recoverProcedure = (WaitProcedure)job.getCurProcedure();
            int recoverIndex = -1;
            for (int i3 = 0; i3 < procedures.length; ++i3) {
                if (!procedures[i3].name().equals(recoverProcedure.name())) continue;
                recoverIndex = i3;
                break;
            }
            scheduler = new BalanceProcedureScheduler(CONF);
            scheduler.init(true);
            scheduler.waitUntilDone(job);
            BalanceJob recoverJob = scheduler.findJob(job);
            Assertions.assertNull((Object)recoverJob.getError());
            Assertions.assertNotSame((Object)job, (Object)recoverJob);
            Assertions.assertEquals((Object)job, (Object)recoverJob);
            Map pTable = recoverJob.getProcedureTable();
            List recoveredProcedures = this.procedureTableToList(pTable, firstProcedure);
            for (i = 0; i < recoverIndex; ++i) {
                Assertions.assertFalse((boolean)((WaitProcedure)((Object)recoveredProcedures.get(i))).getExecuted());
            }
            for (i = recoverIndex; i < procedures.length; ++i) {
                Assertions.assertTrue((boolean)((WaitProcedure)((Object)recoveredProcedures.get(i))).getExecuted());
            }
        }
        finally {
            scheduler.shutDownAndWait(2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testRetry() throws Exception {
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
        scheduler.init(true);
        try {
            BalanceJob.Builder builder = new BalanceJob.Builder();
            RetryProcedure retryProcedure = new RetryProcedure("retry", 1000L, 3);
            builder.nextProcedure((BalanceProcedure)retryProcedure);
            BalanceJob job = builder.build();
            long start = Time.monotonicNow();
            scheduler.submit(job);
            scheduler.waitUntilDone(job);
            Assertions.assertNull((Object)job.getError());
            long duration = Time.monotonicNow() - start;
            Assertions.assertEquals((Object)true, (Object)(duration > 3000L ? 1 : 0));
            Assertions.assertEquals((int)3, (int)retryProcedure.getTotalRetry());
        }
        finally {
            scheduler.shutDownAndWait(2);
        }
    }

    @Test
    @Timeout(value=60L)
    public void testEmptyJob() throws Exception {
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
        scheduler.init(true);
        try {
            BalanceJob job = new BalanceJob.Builder().build();
            scheduler.submit(job);
            scheduler.waitUntilDone(job);
        }
        finally {
            scheduler.shutDownAndWait(2);
        }
    }

    @Test
    @Timeout(value=60L)
    public void testJobSerializeAndDeserialize() throws Exception {
        BalanceJob.Builder builder = new BalanceJob.Builder();
        for (int i = 0; i < 5; ++i) {
            RecordProcedure r = new RecordProcedure("record-" + i, 1000L);
            builder.nextProcedure((BalanceProcedure)r);
        }
        builder.nextProcedure((BalanceProcedure)new RetryProcedure("retry", 1000L, 3));
        BalanceJob job = builder.build();
        job.setId(BalanceProcedureScheduler.allocateJobId());
        ByteArrayOutputStream bao = new ByteArrayOutputStream();
        job.write((DataOutput)new DataOutputStream(bao));
        bao.flush();
        ByteArrayInputStream bai = new ByteArrayInputStream(bao.toByteArray());
        BalanceJob newJob = new BalanceJob.Builder().build();
        newJob.readFields((DataInput)new DataInputStream(bai));
        Assertions.assertEquals((Object)job, (Object)newJob);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=180L)
    public void testSchedulerDownAndRecoverJob() throws Exception {
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
        scheduler.init(true);
        Path parent = new Path("/testSchedulerDownAndRecoverJob");
        try {
            BalanceJob.Builder builder = new BalanceJob.Builder();
            MultiPhaseProcedure multiPhaseProcedure = new MultiPhaseProcedure("retry", 1000L, 10, CONF, parent.toString());
            builder.nextProcedure((BalanceProcedure)multiPhaseProcedure);
            BalanceJob job = builder.build();
            scheduler.submit(job);
            Thread.sleep(500L);
            scheduler.shutDownAndWait(2);
            Assertions.assertFalse((boolean)job.isJobDone());
            int len = fs.listStatus(parent).length;
            Assertions.assertTrue((len > 0 && len < 10 ? 1 : 0) != 0);
            scheduler = new BalanceProcedureScheduler(CONF);
            scheduler.init(true);
            scheduler.waitUntilDone(job);
            Assertions.assertEquals((int)10, (int)fs.listStatus(parent).length);
            for (int i = 0; i < 10; ++i) {
                Assertions.assertTrue((boolean)fs.exists(new Path(parent, "phase-" + i)));
            }
            BalanceJob recoverJob = scheduler.findJob(job);
            Assertions.assertNull((Object)recoverJob.getError());
            Assertions.assertNotSame((Object)job, (Object)recoverJob);
            Assertions.assertEquals((Object)job, (Object)recoverJob);
        }
        finally {
            if (fs.exists(parent)) {
                fs.delete(parent, true);
            }
            scheduler.shutDownAndWait(2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testRecoverJobFromJournal() throws Exception {
        BalanceJournal journal = (BalanceJournal)ReflectionUtils.newInstance(BalanceJournalInfoHDFS.class, (Configuration)CONF);
        BalanceJob.Builder builder = new BalanceJob.Builder();
        WaitProcedure wait0 = new WaitProcedure("wait0", 1000L, 5000L);
        WaitProcedure wait1 = new WaitProcedure("wait1", 1000L, 1000L);
        builder.nextProcedure((BalanceProcedure)wait0).nextProcedure((BalanceProcedure)wait1);
        BalanceJob job = builder.build();
        job.setId(BalanceProcedureScheduler.allocateJobId());
        job.setCurrentProcedure((BalanceProcedure)wait1);
        job.setLastProcedure(null);
        journal.saveJob(job);
        long start = Time.monotonicNow();
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
        scheduler.init(true);
        try {
            scheduler.waitUntilDone(job);
            long duration = Time.monotonicNow() - start;
            Assertions.assertTrue((duration >= 1000L && duration < 5000L ? 1 : 0) != 0);
        }
        finally {
            scheduler.shutDownAndWait(2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testClearJournalFail() throws Exception {
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
        scheduler.init(true);
        BalanceJournal journal = (BalanceJournal)Mockito.mock(BalanceJournal.class);
        AtomicInteger count = new AtomicInteger(0);
        ((BalanceJournal)Mockito.doAnswer(invocation -> {
            if (count.incrementAndGet() == 1) {
                throw new IOException("Mock clear failure");
            }
            return null;
        }).when((Object)journal)).clear((BalanceJob)ArgumentMatchers.any(BalanceJob.class));
        scheduler.setJournal(journal);
        try {
            BalanceJob.Builder builder = new BalanceJob.Builder();
            builder.nextProcedure((BalanceProcedure)new WaitProcedure("wait", 1000L, 1000L));
            BalanceJob job = builder.build();
            scheduler.submit(job);
            scheduler.waitUntilDone(job);
            Assertions.assertEquals((int)2, (int)count.get());
        }
        finally {
            scheduler.shutDownAndWait(2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testJobRecoveryWhenWriteJournalFail() throws Exception {
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
        scheduler.init(true);
        try {
            AtomicBoolean recoverFlag = new AtomicBoolean(true);
            BalanceJob.Builder builder = new BalanceJob.Builder();
            builder.nextProcedure((BalanceProcedure)new WaitProcedure("wait", 1000L, 1000L)).nextProcedure((BalanceProcedure)new UnrecoverableProcedure("shutdown", 1000L, () -> {
                cluster.restartNameNode(false);
                return true;
            })).nextProcedure((BalanceProcedure)new UnrecoverableProcedure("recoverFlag", 1000L, () -> {
                recoverFlag.set(false);
                return true;
            })).nextProcedure((BalanceProcedure)new WaitProcedure("wait", 1000L, 1000L));
            BalanceJob job = builder.build();
            scheduler.submit(job);
            scheduler.waitUntilDone(job);
            Assertions.assertTrue((boolean)job.isJobDone());
            Assertions.assertNull((Object)job.getError());
            Assertions.assertTrue((boolean)recoverFlag.get());
        }
        finally {
            scheduler.shutDownAndWait(2);
        }
    }

    <T extends BalanceProcedure> List<T> procedureTableToList(Map<String, T> pTable, String first) {
        ArrayList<BalanceProcedure> procedures = new ArrayList<BalanceProcedure>();
        BalanceProcedure cur = (BalanceProcedure)pTable.get(first);
        while (cur != null) {
            procedures.add(cur);
            cur = (BalanceProcedure)pTable.get(cur.nextProcedure());
        }
        return procedures;
    }

    static {
        CONF = new Configuration();
    }
}

