package org.apache.hadoop.hbase.procedure2.store;

import java.io.IOException;
import java.io.PrintStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;

/* loaded from: input_file:org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.class */
public abstract class ProcedureStorePerformanceEvaluation<T extends ProcedureStore> extends AbstractHBaseTool {
    public static String DEFAULT_OUTPUT_PATH;
    public static Option OUTPUT_PATH_OPTION;
    public static int DEFAULT_NUM_THREADS;
    public static Option NUM_THREADS_OPTION;
    public static int DEFAULT_NUM_PROCS;
    public static Option NUM_PROCS_OPTION;
    public static int DEFAULT_STATE_SIZE;
    public static Option STATE_SIZE_OPTION;
    public static Option SYNC_OPTION;
    public static String DEFAULT_SYNC_OPTION;
    protected String outputPath;
    protected int numThreads;
    protected long numProcs;
    protected String syncType;
    protected int stateSize;
    protected static byte[] SERIALIZED_STATE;
    protected T store;
    private AtomicLong procIds = new AtomicLong(0);
    private AtomicBoolean workersFailed = new AtomicBoolean(false);
    private static final int WORKER_THREADS_TIMEOUT_SEC = 600;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation$Worker.class */
    private final class Worker implements Callable<Integer> {
        private final long start;

        public Worker(long j) {
            this.start = j;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws IOException {
            while (!ProcedureStorePerformanceEvaluation.this.workersFailed.get()) {
                long andIncrement = ProcedureStorePerformanceEvaluation.this.procIds.getAndIncrement();
                if (andIncrement >= ProcedureStorePerformanceEvaluation.this.numProcs) {
                    return 0;
                }
                if (andIncrement != 0 && andIncrement % 10000 == 0) {
                    long nanoTime = System.nanoTime() - this.start;
                    PrintStream printStream = System.out;
                    StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(nanoTime));
                    printStream.println("Wrote " + andIncrement + " procedures in " + printStream);
                }
                try {
                    ProcedureStorePerformanceEvaluation.this.preWrite(andIncrement);
                    ProcedureTestingUtility.TestProcedure testProcedure = new ProcedureTestingUtility.TestProcedure(andIncrement);
                    testProcedure.setData(ProcedureStorePerformanceEvaluation.SERIALIZED_STATE);
                    ProcedureStorePerformanceEvaluation.this.store.insert(testProcedure, (Procedure[]) null);
                    ProcedureStorePerformanceEvaluation.this.store.update(testProcedure);
                } catch (IOException e) {
                    ProcedureStorePerformanceEvaluation.this.workersFailed.set(true);
                    System.err.println("Exception when rolling log file. Current procId = " + andIncrement);
                    e.printStackTrace();
                    return 1;
                }
            }
            return 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addOptions() {
        addOption(OUTPUT_PATH_OPTION);
        addOption(NUM_THREADS_OPTION);
        addOption(NUM_PROCS_OPTION);
        addOption(SYNC_OPTION);
        addOption(STATE_SIZE_OPTION);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processOptions(CommandLine commandLine) {
        this.outputPath = commandLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt(), DEFAULT_OUTPUT_PATH);
        this.numThreads = getOptionAsInt(commandLine, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
        this.numProcs = getOptionAsInt(commandLine, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
        this.syncType = commandLine.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
        if (!$assertionsDisabled && !"hsync".equals(this.syncType) && !"hflush".equals(this.syncType) && !"nosync".equals(this.syncType)) {
            throw new AssertionError("sync argument can only accept one of these three values: hsync, hflush, nosync");
        }
        this.stateSize = getOptionAsInt(commandLine, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
        SERIALIZED_STATE = new byte[this.stateSize];
        Bytes.random(SERIALIZED_STATE);
    }

    private void setUpProcedureStore() throws IOException {
        FileSystem fileSystem = FileSystem.get(this.conf);
        Path makeQualified = fileSystem.makeQualified(new Path(this.outputPath));
        System.out.println("Procedure store directory : " + makeQualified.toString());
        fileSystem.delete(makeQualified, true);
        this.store = createProcedureStore(makeQualified);
        this.store.start(this.numThreads);
        this.store.recoverLease();
        this.store.load(new ProcedureTestingUtility.LoadCounter());
        System.out.println("Starting new procedure store: " + this.store.getClass().getSimpleName());
    }

    protected abstract T createProcedureStore(Path path) throws IOException;

    protected void postStop(T t) throws IOException {
    }

    private void tearDownProcedureStore() {
        Path path = null;
        try {
            if (this.store != null) {
                this.store.stop(false);
                postStop(this.store);
            }
            FileSystem fileSystem = FileSystem.get(this.conf);
            path = fileSystem.makeQualified(new Path(this.outputPath));
            fileSystem.delete(path, true);
        } catch (IOException e) {
            System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up disk space. Location: " + path);
            e.printStackTrace();
        }
    }

    protected abstract void printRawFormatResult(long j);

    protected int doWork() throws Exception {
        try {
            try {
                setUpProcedureStore();
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.numThreads);
                Future[] futureArr = new Future[this.numThreads];
                long nanoTime = System.nanoTime();
                for (int i = 0; i < this.numThreads; i++) {
                    futureArr[i] = newFixedThreadPool.submit(new Worker(nanoTime));
                }
                boolean z = false;
                try {
                    for (Future future : futureArr) {
                        z |= future.get((nanoTime + 600000) - EnvironmentEdgeManager.currentTime(), TimeUnit.MILLISECONDS).equals(1);
                    }
                    newFixedThreadPool.shutdown();
                    if (z) {
                        tearDownProcedureStore();
                        return 1;
                    }
                    long nanoTime2 = System.nanoTime() - nanoTime;
                    System.out.println("******************************************");
                    System.out.println("Num threads    : " + this.numThreads);
                    System.out.println("Num procedures : " + this.numProcs);
                    System.out.println("Sync type      : " + this.syncType);
                    System.out.println("Time taken     : " + TimeUnit.NANOSECONDS.toSeconds(nanoTime2) + "sec");
                    System.out.println("******************************************");
                    System.out.println("Raw format for scripts");
                    printRawFormatResult(nanoTime2);
                    tearDownProcedureStore();
                    return 0;
                } catch (Exception e) {
                    System.err.println("Exception in worker thread.");
                    e.printStackTrace();
                    tearDownProcedureStore();
                    return 1;
                }
            } catch (IOException e2) {
                e2.printStackTrace();
                tearDownProcedureStore();
                return 1;
            }
        } catch (Throwable th) {
            tearDownProcedureStore();
            throw th;
        }
    }

    protected abstract void preWrite(long j) throws IOException;

    static {
        $assertionsDisabled = !ProcedureStorePerformanceEvaluation.class.desiredAssertionStatus();
        DEFAULT_OUTPUT_PATH = "proc-store";
        OUTPUT_PATH_OPTION = new Option("output", true, "The output path. Default: " + DEFAULT_OUTPUT_PATH);
        DEFAULT_NUM_THREADS = 20;
        NUM_THREADS_OPTION = new Option("threads", true, "Number of parallel threads which will write insert/updates/deletes to store. Default: " + DEFAULT_NUM_THREADS);
        DEFAULT_NUM_PROCS = 1000000;
        NUM_PROCS_OPTION = new Option("procs", true, "Total number of procedures. Each procedure writes one insert and one update. Default: " + DEFAULT_NUM_PROCS);
        DEFAULT_STATE_SIZE = 1024;
        STATE_SIZE_OPTION = new Option("state_size", true, "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE + "bytes");
        SYNC_OPTION = new Option("sync", true, "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, hsync, nosync. Default: hflush");
        DEFAULT_SYNC_OPTION = "hflush";
    }
}
