package org.apache.hadoop.hbase.procedure2;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({MasterTests.class, SmallTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.class */
public class TestProcedureCleanup {

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestProcedureCleanup.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class);
    private static final int PROCEDURE_EXECUTOR_SLOTS = 2;
    private static WALProcedureStore procStore;
    private static ProcedureExecutor<Void> procExecutor;
    private static HBaseCommonTestingUtility htu;
    private static FileSystem fs;
    private static Path testDir;
    private static Path logDir;

    @Rule
    public final TestName name = new TestName();

    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/TestProcedureCleanup$ExchangeProcedure.class */
    public static final class ExchangeProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
        private final Exchanger<Boolean> exchanger = new Exchanger<>();

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure
        public Procedure<Void>[] execute(Void r6) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
            if (this.exchanger.exchange(Boolean.TRUE).booleanValue()) {
                return new Procedure[]{this};
            }
            return null;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/TestProcedureCleanup$RootProcedure.class */
    public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
        private boolean childSpwaned = false;

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure
        public Procedure<Void>[] execute(Void r7) throws ProcedureSuspendedException {
            if (this.childSpwaned) {
                return null;
            }
            this.childSpwaned = true;
            return new Procedure[]{new SuspendProcedure()};
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/TestProcedureCleanup$SuspendProcedure.class */
    public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
        private CountDownLatch latch = new CountDownLatch(1);

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure
        public Procedure<Void>[] execute(Void r4) throws ProcedureSuspendedException {
            TestProcedureCleanup.LOG.info("suspend here");
            this.latch.countDown();
            throw new ProcedureSuspendedException();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/TestProcedureCleanup$WaitProcedure.class */
    public static class WaitProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
        private CountDownLatch latch = new CountDownLatch(1);

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure
        public Procedure<Void>[] execute(Void r4) throws ProcedureSuspendedException {
            TestProcedureCleanup.LOG.info("wait here");
            try {
                this.latch.await();
            } catch (Throwable th) {
            }
            TestProcedureCleanup.LOG.info("finished");
            return null;
        }
    }

    private void createProcExecutor() throws Exception {
        logDir = new Path(testDir, this.name.getMethodName());
        procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
        procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), (Object) null, procStore);
        procStore.start(PROCEDURE_EXECUTOR_SLOTS);
        ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true);
    }

    @BeforeClass
    public static void setUp() throws Exception {
        htu = new HBaseCommonTestingUtility();
        htu.getConfiguration().setBoolean("hbase.procedure.store.wal.exec.cleanup.on.load", true);
        testDir = htu.getDataTestDir();
        fs = testDir.getFileSystem(htu.getConfiguration());
        Assert.assertTrue(testDir.depth() > 1);
    }

    @Test
    public void testProcedureShouldNotCleanOnLoad() throws Exception {
        createProcExecutor();
        long submitProcedure = procExecutor.submitProcedure(new RootProcedure());
        LOG.info("Begin to execute " + submitProcedure);
        htu.waitFor(10000L, () -> {
            return procExecutor.getProcedures().size() >= PROCEDURE_EXECUTOR_SLOTS;
        });
        ((SuspendProcedure) procExecutor.getProcedures().get(1)).latch.countDown();
        Thread.sleep(100L);
        LOG.info("Begin to roll log ");
        procStore.rollWriterForTesting();
        LOG.info("finish to roll log ");
        Thread.sleep(500L);
        LOG.info("begin to restart1 ");
        ProcedureTestingUtility.restart(procExecutor, true);
        LOG.info("finish to restart1 ");
        Assert.assertTrue(procExecutor.getProcedure(submitProcedure) != null);
        Thread.sleep(500L);
        LOG.info("begin to restart2 ");
        ProcedureTestingUtility.restart(procExecutor, true);
        LOG.info("finish to restart2 ");
        Assert.assertTrue(procExecutor.getProcedure(submitProcedure) != null);
    }

    @Test
    public void testProcedureUpdatedShouldClean() throws Exception {
        createProcExecutor();
        SuspendProcedure suspendProcedure = new SuspendProcedure();
        long submitProcedure = procExecutor.submitProcedure(suspendProcedure);
        LOG.info("Begin to execute " + submitProcedure);
        suspendProcedure.latch.countDown();
        Thread.sleep(500L);
        LOG.info("begin to restart1 ");
        ProcedureTestingUtility.restart(procExecutor, true);
        LOG.info("finish to restart1 ");
        htu.waitFor(10000L, () -> {
            return procExecutor.getProcedure(submitProcedure) != null;
        });
        ((SuspendProcedure) procExecutor.getProcedure(submitProcedure)).latch.countDown();
        Thread.sleep(500L);
        Assert.assertTrue(procStore.getActiveLogs().size() == 1);
        LOG.info("begin to restart2");
        ProcedureTestingUtility.restart(procExecutor, true, false);
        LOG.info("finish to restart2");
        Assert.assertTrue(procStore.getActiveLogs().size() == PROCEDURE_EXECUTOR_SLOTS);
        procExecutor.startWorkers();
    }

    @Test
    public void testProcedureDeletedShouldClean() throws Exception {
        createProcExecutor();
        long submitProcedure = procExecutor.submitProcedure(new WaitProcedure());
        LOG.info("Begin to execute " + submitProcedure);
        Thread.sleep(500L);
        LOG.info("begin to restart1 ");
        ProcedureTestingUtility.restart(procExecutor, true);
        LOG.info("finish to restart1 ");
        htu.waitFor(10000L, () -> {
            return procExecutor.getProcedure(submitProcedure) != null;
        });
        ((WaitProcedure) procExecutor.getProcedure(submitProcedure)).latch.countDown();
        Thread.sleep(500L);
        Assert.assertTrue(procStore.getActiveLogs().size() == 1);
        LOG.info("begin to restart2");
        ProcedureTestingUtility.restart(procExecutor, true, false);
        LOG.info("finish to restart2");
        Assert.assertTrue(procStore.getActiveLogs().size() == PROCEDURE_EXECUTOR_SLOTS);
        procExecutor.startWorkers();
    }

    private void corrupt(FileStatus fileStatus) throws IOException {
        LOG.info("Corrupt " + fileStatus);
        Path suffix = fileStatus.getPath().suffix(".tmp");
        FSDataInputStream open = fs.open(fileStatus.getPath());
        try {
            FSDataOutputStream create = fs.create(suffix);
            try {
                ByteStreams.copy(ByteStreams.limit(open, fileStatus.getLen() - 1), create);
                if (create != null) {
                    create.close();
                }
                if (open != null) {
                    open.close();
                }
                fs.delete(fileStatus.getPath(), false);
                fs.rename(suffix, fileStatus.getPath());
            } finally {
            }
        } catch (Throwable th) {
            if (open != null) {
                try {
                    open.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testResetDeleteWhenBuildingHoldingCleanupTracker() throws Exception {
        createProcExecutor();
        ExchangeProcedure exchangeProcedure = new ExchangeProcedure();
        ExchangeProcedure exchangeProcedure2 = new ExchangeProcedure();
        procExecutor.submitProcedure(exchangeProcedure);
        long submitProcedure = procExecutor.submitProcedure(exchangeProcedure2);
        Thread.sleep(500L);
        procStore.rollWriterForTesting();
        exchangeProcedure.exchanger.exchange(Boolean.TRUE);
        Thread.sleep(500L);
        FileStatus[] listStatus = fs.listStatus(logDir);
        Arrays.sort(listStatus, (fileStatus, fileStatus2) -> {
            return fileStatus.getPath().getName().compareTo(fileStatus2.getPath().getName());
        });
        corrupt(listStatus[0]);
        ProcedureTestingUtility.restart(procExecutor, false, true);
        ((ExchangeProcedure) procExecutor.getProcedure(submitProcedure)).exchanger.exchange(Boolean.TRUE);
        htu.waitFor(10000L, () -> {
            return !fs.exists(listStatus[0].getPath());
        });
    }
}
