/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.txn.compactor;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.apache.hadoop.hive.ql.txn.compactor.MetaStoreCompactorThread;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Initiator
extends MetaStoreCompactorThread {
    private static final String CLASS_NAME = Initiator.class.getName();
    private static final Logger LOG = LoggerFactory.getLogger((String)CLASS_NAME);
    private static final String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold.";
    private long checkInterval;
    private long prevStart = -1L;
    private ExecutorService compactionExecutor;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LOG.info("Starting Initiator thread");
        try {
            this.recoverFailedCompactions(false);
            int abortedThreshold = HiveConf.getIntVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
            long abortedTimeThreshold = HiveConf.getTimeVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, (TimeUnit)TimeUnit.MILLISECONDS);
            boolean metricsEnabled = MetastoreConf.getBoolVar((Configuration)this.conf, (MetastoreConf.ConfVars)MetastoreConf.ConfVars.METRICS_ENABLED);
            do {
                long startedAt = -1L;
                TxnStore.MutexAPI.LockHandle handle = null;
                try {
                    handle = this.txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
                    startedAt = System.currentTimeMillis();
                    long compactionInterval = this.prevStart < 0L ? this.prevStart : (startedAt - this.prevStart) / 1000L;
                    this.prevStart = startedAt;
                    ShowCompactResponse currentCompactions = this.txnHandler.showCompact(new ShowCompactRequest());
                    if (metricsEnabled) {
                        Initiator.updateCompactionMetrics(currentCompactions);
                    }
                    Set potentials = this.txnHandler.findPotentialCompactions(abortedThreshold, abortedTimeThreshold, compactionInterval).stream().filter(ci -> this.isEligibleForCompaction((CompactionInfo)ci, currentCompactions)).collect(Collectors.toSet());
                    LOG.debug("Found " + potentials.size() + " potential compactions, checking to see if we should compact any of them");
                    HashMap<String, String> tblNameOwners = new HashMap<String, String>();
                    ArrayList<CompletableFuture<Void>> compactionList = new ArrayList<CompletableFuture<Void>>();
                    if (!potentials.isEmpty()) {
                        ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList((GetOpenTxnsResponse)this.txnHandler.getOpenTxns(), (long)0L);
                        this.conf.set("hive.txn.valid.txns", validTxnList.writeToString());
                    }
                    for (CompactionInfo ci2 : potentials) {
                        try {
                            Table t = this.resolveTable(ci2);
                            Partition p = this.resolvePartition(ci2);
                            if (p == null && ci2.partName != null) {
                                LOG.info("Can't find partition " + ci2.getFullPartitionName() + ", assuming it has been dropped and moving on.");
                                continue;
                            }
                            String runAs = this.resolveUserToRunAs(tblNameOwners, t, p);
                            compactionList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() -> this.scheduleCompactionIfRequired(ci2, t, p, runAs)), this.compactionExecutor));
                        }
                        catch (Throwable t) {
                            LOG.error("Caught exception while trying to determine if we should compact {}. Marking failed to avoid repeated failures, {}", (Object)ci2, (Object)t);
                            ci2.errorMessage = t.getMessage();
                            this.txnHandler.markFailed(ci2);
                        }
                    }
                    CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0])).join();
                    this.recoverFailedCompactions(true);
                }
                catch (Throwable t) {
                    LOG.error("Initiator loop caught unexpected exception this time through the loop: " + StringUtils.stringifyException((Throwable)t));
                }
                finally {
                    if (handle != null) {
                        handle.releaseLocks();
                    }
                }
                long elapsedTime = System.currentTimeMillis() - startedAt;
                if (elapsedTime < this.checkInterval && !this.stop.get()) {
                    Thread.sleep(this.checkInterval - elapsedTime);
                }
                LOG.info("Initiator thread finished one loop.");
            } while (!this.stop.get());
        }
        catch (Throwable t) {
            LOG.error("Caught an exception in the main loop of compactor initiator, exiting " + StringUtils.stringifyException((Throwable)t));
        }
        finally {
            if (this.compactionExecutor != null) {
                this.compactionExecutor.shutdownNow();
            }
        }
    }

    private void scheduleCompactionIfRequired(CompactionInfo ci, Table t, Partition p, String runAs) throws MetaException {
        StorageDescriptor sd = this.resolveStorageDescriptor(t, p);
        try {
            ValidWriteIdList validWriteIds = this.resolveValidWriteIds(t);
            CompactionType type = this.checkForCompaction(ci, validWriteIds, sd, t.getParameters(), runAs);
            if (type != null) {
                this.requestCompaction(ci, runAs, type);
            }
        }
        catch (Throwable ex) {
            String errorMessage = "Caught exception while trying to determine if we should compact " + ci + ". Marking failed to avoid repeated failures, " + ex;
            LOG.error(errorMessage);
            ci.errorMessage = errorMessage;
            this.txnHandler.markFailed(ci);
        }
    }

    private ValidWriteIdList resolveValidWriteIds(Table t) throws NoSuchTxnException, MetaException {
        ValidReadTxnList validTxnList = new ValidReadTxnList(this.conf.get("hive.txn.valid.txns"));
        String fullTableName = TxnUtils.getFullTableName((String)t.getDbName(), (String)t.getTableName());
        GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
        rqst.setValidTxnList(validTxnList.writeToString());
        return TxnUtils.createValidCompactWriteIdList((TableValidWriteIds)((TableValidWriteIds)this.txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)));
    }

    @VisibleForTesting
    protected String resolveUserToRunAs(Map<String, String> cache, Table t, Partition p) throws IOException, InterruptedException {
        String fullTableName = TxnUtils.getFullTableName((String)t.getDbName(), (String)t.getTableName());
        StorageDescriptor sd = this.resolveStorageDescriptor(t, p);
        cache.putIfAbsent(fullTableName, this.findUserToRunAs(sd.getLocation(), t));
        return cache.get(fullTableName);
    }

    @Override
    public void init(AtomicBoolean stop) throws Exception {
        super.init(stop);
        this.checkInterval = this.conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
        this.compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(this.conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE), "Initiator-executor-thread-%d");
    }

    private void recoverFailedCompactions(boolean remoteOnly) throws MetaException {
        if (!remoteOnly) {
            this.txnHandler.revokeFromLocalWorkers(Worker.hostname());
        }
        this.txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, (TimeUnit)TimeUnit.MILLISECONDS));
    }

    private boolean lookForCurrentCompactions(ShowCompactResponse compactions, CompactionInfo ci) {
        if (compactions.getCompacts() != null) {
            for (ShowCompactResponseElement e : compactions.getCompacts()) {
                if (!e.getState().equals("working") && !e.getState().equals("initiated") || !e.getDbname().equals(ci.dbname) || !e.getTablename().equals(ci.tableName) || (e.getPartitionname() != null || ci.partName != null) && !e.getPartitionname().equals(ci.partName)) continue;
                return true;
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompactionType checkForCompaction(final CompactionInfo ci, final ValidWriteIdList writeIds, final StorageDescriptor sd, final Map<String, String> tblproperties, String runAs) throws IOException, InterruptedException {
        CompactionType compactionType;
        if (ci.tooManyAborts) {
            LOG.debug("Found too many aborted transactions for " + ci.getFullPartitionName() + ", initiating major compaction");
            return CompactionType.MAJOR;
        }
        if (ci.hasOldAbort) {
            HiveConf.ConfVars oldAbortedTimeoutProp = HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
            LOG.debug("Found an aborted transaction for " + ci.getFullPartitionName() + " with age older than threshold " + oldAbortedTimeoutProp + ": " + this.conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. Initiating minor compaction.");
            return CompactionType.MINOR;
        }
        if (this.runJobAsSelf(runAs)) {
            return this.determineCompactionType(ci, writeIds, sd, tblproperties);
        }
        LOG.info("Going to initiate as user " + runAs + " for " + ci.getFullPartitionName());
        UserGroupInformation ugi = UserGroupInformation.createProxyUser((String)runAs, (UserGroupInformation)UserGroupInformation.getLoginUser());
        try {
            compactionType = (CompactionType)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<CompactionType>(){

                @Override
                public CompactionType run() throws Exception {
                    return Initiator.this.determineCompactionType(ci, writeIds, sd, tblproperties);
                }
            });
        }
        finally {
            try {
                FileSystem.closeAllForUGI((UserGroupInformation)ugi);
            }
            catch (IOException exception) {
                LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + ci.getFullPartitionName(), (Throwable)exception);
            }
        }
        return compactionType;
    }

    private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdList writeIds, StorageDescriptor sd, Map<String, String> tblproperties) throws IOException {
        boolean enough;
        boolean noBase = false;
        Path location = new Path(sd.getLocation());
        FileSystem fs = location.getFileSystem((Configuration)this.conf);
        AcidDirectory dir = AcidUtils.getAcidState(fs, location, (Configuration)this.conf, writeIds, (Ref<Boolean>)Ref.from((Object)false), false);
        long baseSize = 0L;
        if (dir.getBase() != null) {
            baseSize = this.sumDirSize(fs, dir.getBase());
        } else {
            for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) {
                baseSize += origStat.getFileStatus().getLen();
            }
        }
        long deltaSize = 0L;
        List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
        for (AcidUtils.ParsedDelta delta : deltas) {
            deltaSize += this.sumDirSize(fs, delta);
        }
        if (baseSize == 0L && deltaSize > 0L) {
            noBase = true;
        } else {
            boolean initiateMajor;
            String deltaPctProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
            float deltaPctThreshold = deltaPctProp == null ? HiveConf.getFloatVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : Float.parseFloat(deltaPctProp);
            boolean bigEnough = (float)deltaSize / (float)baseSize > deltaPctThreshold;
            boolean multiBase = dir.getObsolete().stream().filter(path -> path.getName().startsWith("base_")).findAny().isPresent();
            boolean bl = initiateMajor = bigEnough || deltaSize == 0L && multiBase;
            if (LOG.isDebugEnabled()) {
                StringBuilder msg = new StringBuilder("delta size: ");
                msg.append(deltaSize);
                msg.append(" base size: ");
                msg.append(baseSize);
                msg.append(" multiBase ");
                msg.append(multiBase);
                msg.append(" deltaSize ");
                msg.append(deltaSize);
                msg.append(" threshold: ");
                msg.append(deltaPctThreshold);
                msg.append(" delta/base ratio > ").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname).append(": ");
                msg.append(bigEnough);
                msg.append(".");
                if (!initiateMajor) {
                    msg.append("not");
                }
                msg.append(" initiating major compaction.");
                LOG.debug(msg.toString());
            }
            if (initiateMajor) {
                return CompactionType.MAJOR;
            }
        }
        String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
        int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : Integer.parseInt(deltaNumProp);
        boolean bl = enough = deltas.size() > deltaNumThreshold;
        if (!enough) {
            LOG.debug("Not enough deltas to initiate compaction for table=" + ci.tableName + "partition=" + ci.partName + ". Found: " + deltas.size() + " deltas, threshold is " + deltaNumThreshold);
            return null;
        }
        if (AcidUtils.isInsertOnlyTable(tblproperties)) {
            LOG.debug("Requesting a major compaction for a MM table; found " + deltas.size() + " deltas, threshold is " + deltaNumThreshold);
            return CompactionType.MAJOR;
        }
        LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" : "has") + " base,requesting " + (noBase ? "major" : "minor") + " compaction");
        return noBase ? CompactionType.MAJOR : CompactionType.MINOR;
    }

    private long sumDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) throws IOException {
        long size = dir.getFiles(fs, (Ref<Boolean>)Ref.from((Object)false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus).mapToLong(FileStatus::getLen).sum();
        return size;
    }

    private void requestCompaction(CompactionInfo ci, String runAs, CompactionType type) throws MetaException {
        CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, type);
        if (ci.partName != null) {
            rqst.setPartitionname(ci.partName);
        }
        rqst.setRunas(runAs);
        LOG.info("Requesting compaction: " + rqst);
        CompactionResponse resp = this.txnHandler.compact(rqst);
        if (resp.isAccepted()) {
            ci.id = resp.getId();
        }
    }

    private boolean noAutoCompactSet(Table t) {
        String noAutoCompact = (String)t.getParameters().get("no_auto_compaction");
        if (noAutoCompact == null) {
            noAutoCompact = (String)t.getParameters().get("no_auto_compaction".toUpperCase());
        }
        return noAutoCompact != null && noAutoCompact.equalsIgnoreCase("true");
    }

    private static boolean isDynPartIngest(Table t, CompactionInfo ci) {
        if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && ci.partName == null && !ci.hasOldAbort) {
            LOG.info("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic partitioning");
            return true;
        }
        return false;
    }

    private boolean isEligibleForCompaction(CompactionInfo ci, ShowCompactResponse currentCompactions) {
        LOG.info("Checking to see if we should compact " + ci.getFullPartitionName());
        if (this.lookForCurrentCompactions(currentCompactions, ci)) {
            LOG.info("Found currently initiated or working compaction for " + ci.getFullPartitionName() + " so we will not initiate another compaction");
            return false;
        }
        try {
            Table t = this.resolveTable(ci);
            if (t == null) {
                LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp table or has been dropped and moving on.");
                return false;
            }
            if (this.replIsCompactionDisabledForDatabase(ci.dbname) || this.replIsCompactionDisabledForTable(t)) {
                return false;
            }
            if (this.noAutoCompactSet(t)) {
                LOG.info("Table " + this.tableName(t) + " marked " + "no_auto_compaction" + "=true so we will not compact it.");
                return false;
            }
            if (AcidUtils.isInsertOnlyTable(t.getParameters()) && !HiveConf.getBoolVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
                LOG.info("Table " + this.tableName(t) + " is insert only and " + HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM.varname + "=false so we will not compact it.");
                return false;
            }
            if (Initiator.isDynPartIngest(t, ci)) {
                return false;
            }
            if (this.txnHandler.checkFailedCompactions(ci)) {
                LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last " + MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed.");
                ci.errorMessage = "Compaction is not initiated since last " + MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " consecutive compaction attempts failed)";
                this.txnHandler.markFailed(ci);
                return false;
            }
        }
        catch (Throwable e) {
            LOG.error("Caught exception while checking compaction eligibility.", e);
            try {
                ci.errorMessage = e.getMessage();
                this.txnHandler.markFailed(ci);
            }
            catch (MetaException ex) {
                LOG.error("Caught exception while marking compaction as failed.", e);
                return false;
            }
        }
        return true;
    }

    @VisibleForTesting
    protected static void updateCompactionMetrics(ShowCompactResponse showCompactResponse) {
        String key;
        HashMap<String, ShowCompactResponseElement> lastElements = new HashMap<String, ShowCompactResponseElement>();
        long oldestEnqueueTime = Long.MAX_VALUE;
        for (ShowCompactResponseElement element : showCompactResponse.getCompacts()) {
            key = element.getDbname() + "/" + element.getTablename() + (element.getPartitionname() != null ? "/" + element.getPartitionname() : "");
            lastElements.compute(key, (k, old) -> old == null ? element : (element.getId() > old.getId() ? element : old));
            if (!"initiated".equals(element.getState()) || oldestEnqueueTime <= element.getEnqueueTime()) continue;
            oldestEnqueueTime = element.getEnqueueTime();
        }
        Map<String, Long> counts = lastElements.values().stream().collect(Collectors.groupingBy(e -> e.getState(), Collectors.counting()));
        for (int i = 0; i < TxnStore.COMPACTION_STATES.length; ++i) {
            key = "compaction_num_" + TxnStore.COMPACTION_STATES[i];
            Long count = counts.get(TxnStore.COMPACTION_STATES[i]);
            if (count != null) {
                Metrics.getOrCreateGauge((String)key).set(count.intValue());
                continue;
            }
            Metrics.getOrCreateGauge((String)key).set(0);
        }
        if (oldestEnqueueTime == Long.MAX_VALUE) {
            Metrics.getOrCreateGauge((String)"compaction_oldest_enqueue_age_in_sec").set(0);
        } else {
            Metrics.getOrCreateGauge((String)"compaction_oldest_enqueue_age_in_sec").set((int)((System.currentTimeMillis() - oldestEnqueueTime) / 1000L));
        }
    }
}

