package org.apache.flink.tests.util.hbase;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.CommandLineWrapper;
import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.util.OperatingSystem;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.class */
public class LocalStandaloneHBaseResource implements HBaseResource {
    private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneHBaseResource.class);
    private static final int MAX_RETRIES = 3;
    private static final int RETRY_INTERVAL_SECONDS = 30;
    private final TemporaryFolder tmp = new TemporaryFolder();
    private final DownloadCache downloadCache = DownloadCache.get();
    private final String hbaseVersion;
    private Path hbaseDir;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LocalStandaloneHBaseResource(String str) {
        OperatingSystemRestriction.forbid(String.format("The %s relies on UNIX utils and shell scripts.", getClass().getSimpleName()), new OperatingSystem[]{OperatingSystem.WINDOWS});
        this.hbaseVersion = str;
    }

    private String getHBaseDownloadUrl() {
        return String.format("https://archive.apache.org/dist/hbase/%1$s/hbase-%1$s-bin.tar.gz", this.hbaseVersion);
    }

    public void before() throws Exception {
        this.tmp.create();
        this.downloadCache.before();
        this.hbaseDir = this.tmp.newFolder("hbase-" + this.hbaseVersion).toPath().toAbsolutePath();
        setupHBaseDist();
        setupHBaseCluster();
    }

    private void setupHBaseDist() throws IOException {
        Path orDownload = this.downloadCache.getOrDownload(getHBaseDownloadUrl(), this.tmp.newFolder("getOrDownload").toPath());
        LOG.info("HBase location: {}", this.hbaseDir.toAbsolutePath());
        AutoClosableProcess.runBlocking(CommandLineWrapper.tar(orDownload).extract().zipped().strip(1).targetDir(this.hbaseDir).build());
        LOG.info("Configure {} as hbase.tmp.dir", this.hbaseDir.toAbsolutePath());
        Files.write(this.hbaseDir.resolve(Paths.get("conf", "hbase-site.xml")), ("<configuration><property><name>hbase.tmp.dir</name><value>" + this.hbaseDir + "</value></property></configuration>").getBytes(), new OpenOption[0]);
    }

    private void setupHBaseCluster() throws IOException {
        LOG.info("Starting HBase cluster...");
        runHBaseProcessWithRetry("start-hbase.sh", () -> {
            return Boolean.valueOf(!isHMasterRunning());
        });
        LOG.info("Start HBase cluster success");
    }

    public void afterTestSuccess() {
        shutdownResource();
        this.downloadCache.afterTestSuccess();
        this.tmp.delete();
    }

    private void shutdownResource() {
        LOG.info("Stopping HBase Cluster...");
        try {
            runHBaseProcessWithRetry("stop-hbase.sh", () -> {
                return Boolean.valueOf(isHMasterAlive());
            });
        } catch (IOException e) {
            LOG.warn("Error when shutting down HBase Cluster.", e);
        }
        LOG.info("Stop HBase Cluster success");
    }

    private void runHBaseProcessWithRetry(String str, Supplier<Boolean> supplier) throws IOException {
        LOG.info("Execute {} for HBase Cluster", str);
        for (int i = 1; i <= MAX_RETRIES; i++) {
            try {
                AutoClosableProcess.runBlocking(new String[]{this.hbaseDir.resolve(Paths.get("bin", str)).toString()});
            } catch (IOException e) {
                LOG.warn("Get exception when execute {} ", str, e);
            }
            int i2 = 0;
            while (supplier.get().booleanValue()) {
                try {
                    LOG.info("Waiting for HBase {} works", str);
                    Thread.sleep(1000L);
                } catch (InterruptedException e2) {
                    LOG.warn("sleep interrupted", e2);
                }
                i2++;
                if (i2 > RETRY_INTERVAL_SECONDS) {
                    break;
                }
            }
            if (i2 < RETRY_INTERVAL_SECONDS) {
                return;
            }
            if (i == MAX_RETRIES) {
                LOG.error("Execute {} failed, retry times {}", str, Integer.valueOf(i));
                throw new IllegalArgumentException(String.format("Execute %s failed aftert retry %s times", str, Integer.valueOf(i)));
            }
            LOG.warn("Execute {} failed, retry times {}", str, Integer.valueOf(i));
        }
    }

    private boolean isHMasterRunning() {
        try {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            queryHBaseStatus(str -> {
                atomicBoolean.compareAndSet(false, str.contains("hbase:namespace"));
            });
            return atomicBoolean.get();
        } catch (IOException e) {
            return false;
        }
    }

    private void queryHBaseStatus(Consumer<String> consumer) throws IOException {
        executeHBaseShell("scan 'hbase:meta'", consumer);
    }

    private boolean isHMasterAlive() {
        try {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            queryHBaseProcess(str -> {
                atomicBoolean.compareAndSet(false, str.contains("HMaster"));
            });
            return atomicBoolean.get();
        } catch (IOException e) {
            return false;
        }
    }

    private void queryHBaseProcess(Consumer<String> consumer) throws IOException {
        AutoClosableProcess.create(new String[]{"jps"}).setStdoutProcessor(consumer).runBlocking();
    }

    @Override // org.apache.flink.tests.util.hbase.HBaseResource
    public void createTable(String str, String... strArr) throws IOException {
        executeHBaseShell(String.format("create '%s',", str) + ((String) Arrays.stream(strArr).map(str2 -> {
            return String.format("{NAME=>'%s'}", str2);
        }).collect(Collectors.joining(","))));
    }

    @Override // org.apache.flink.tests.util.hbase.HBaseResource
    public List<String> scanTable(String str) throws IOException {
        ArrayList arrayList = new ArrayList();
        executeHBaseShell(String.format("scan '%s'", str), str2 -> {
            if (str2.contains("value=")) {
                arrayList.add(str2);
            }
        });
        return arrayList;
    }

    @Override // org.apache.flink.tests.util.hbase.HBaseResource
    public void putData(String str, String str2, String str3, String str4, String str5) throws IOException {
        executeHBaseShell(String.format("put '%s','%s','%s:%s','%s'", str, str2, str3, str4, str5));
    }

    private void executeHBaseShell(String str) throws IOException {
        executeHBaseShell(str, str2 -> {
        });
    }

    private void executeHBaseShell(String str, Consumer<String> consumer) throws IOException {
        AutoClosableProcess.create(new String[]{this.hbaseDir.resolve(Paths.get("bin", "hbase")).toString(), "shell"}).setStdoutProcessor(consumer).setStdInputs(new String[]{str}).runBlocking();
    }
}
