/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.mapreduce.index.automation;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.phoenix.mapreduce.index.automation.PhoenixAsyncIndex;
import org.apache.phoenix.mapreduce.index.automation.PhoenixMRJobCallable;
import org.apache.phoenix.mapreduce.index.automation.YarnApplication;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.JacksonUtil;
import org.apache.phoenix.util.PhoenixMRJobUtil;
import org.apache.phoenix.util.UpgradeUtil;
import org.apache.phoenix.util.ZKBasedMasterElectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhoenixMRJobSubmitter {
    private static final String PHOENIX_LOCKS_PARENT = "/phoenix/automated-mr-index-build-leader-election";
    public static final String PHOENIX_MR_CLIENT_SCANNER_TIMEOUT_PERIOD = "phoenix.mr.client.scanner.timeout.period";
    public static final String PHOENIX_MR_RPC_TIMEOUT = "phoenix.mr.rpc.timeout";
    public static final String PHOENIX_MR_TASK_TIMEOUT = "phoenix.mr.task.timeout";
    public static final String PHOENIX_MR_CLIENT_RETRIES_NUMBER = "phoenix.mr.client.retries.number";
    public static final String PHOENIX_MR_CLIENT_PAUSE = "phoenix.mr.client.retries.number";
    public static final String PHOENIX_MR_ZK_RECOVERY_RETRY = "phoenix.mr.zk.recovery.retry";
    private static final String AUTO_INDEX_BUILD_LOCK_NAME = "ActiveStandbyElectorLock";
    private static final int DEFAULT_TIMEOUT_IN_MILLIS = 600000;
    public static final int DEFAULT_MR_CLIENT_SCANNER_TIMEOUT_PERIOD = 600000;
    public static final int DEFAULT_MR_RPC_TIMEOUT = 600000;
    public static final int DEFAULT_MR_TASK_TIMEOUT = 600000;
    public static final int DEFAULT_MR_CLIENT_RETRIES_NUMBER = 10;
    public static final int DEFAULT_MR_CLIENT_PAUSE = 1000;
    public static final int DEFAULT_MR_ZK_RECOVERY_RETRY = 1;
    public static final String CANDIDATE_INDEX_INFO_QUERY = "SELECT INDEX_TYPE,DATA_TABLE_NAME, TABLE_SCHEM, TABLE_NAME, ASYNC_CREATED_DATE, ASYNC_REBUILD_TIMESTAMP FROM SYSTEM.\"CATALOG\" (ASYNC_CREATED_DATE " + PDate.INSTANCE.getSqlTypeName() + ", ASYNC_REBUILD_TIMESTAMP " + PLong.INSTANCE.getSqlTypeName() + ")  WHERE COLUMN_NAME IS NULL and COLUMN_FAMILY IS NULL  and (ASYNC_CREATED_DATE IS NOT NULL OR ASYNC_REBUILD_TIMESTAMP IS NOT NULL ) and TABLE_TYPE = '" + PTableType.INDEX.getSerializedValue() + "' and INDEX_STATE = '" + PIndexState.BUILDING.getSerializedValue() + "'";
    private static final int JOB_SUBMIT_POOL_TIMEOUT = 5;
    private Configuration conf;
    private String zkQuorum;
    private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMRJobSubmitter.class);

    public PhoenixMRJobSubmitter() throws IOException {
        this(null);
    }

    public PhoenixMRJobSubmitter(Configuration conf) throws IOException {
        if (conf == null) {
            conf = HBaseConfiguration.create();
        }
        this.conf = conf;
        conf.setLong("hbase.client.scanner.timeout.period", conf.getLong(PHOENIX_MR_CLIENT_SCANNER_TIMEOUT_PERIOD, 600000L));
        conf.setLong("hbase.rpc.timeout", conf.getLong(PHOENIX_MR_RPC_TIMEOUT, 600000L));
        conf.setLong("mapreduce.task.timeout", conf.getLong(PHOENIX_MR_TASK_TIMEOUT, 600000L));
        conf.setInt("hbase.client.retries.number", conf.getInt("phoenix.mr.client.retries.number", 10));
        conf.setInt("hbase.client.pause", conf.getInt("phoenix.mr.client.retries.number", 1000));
        conf.setInt("zookeeper.recovery.retry", conf.getInt(PHOENIX_MR_ZK_RECOVERY_RETRY, 1));
        String schedulerType = conf.get("phoenix.index.mr.scheduler.type", PhoenixMRJobUtil.MR_SCHEDULER_TYPE.NONE.toString());
        PhoenixMRJobUtil.MR_SCHEDULER_TYPE type = PhoenixMRJobUtil.MR_SCHEDULER_TYPE.valueOf(schedulerType);
        switch (type) {
            case CAPACITY: {
                LOGGER.info("Applying the Capacity Scheduler Queue Configurations");
                PhoenixMRJobUtil.updateCapacityQueueInfo(conf);
                break;
            }
            case FAIR: {
                LOGGER.warn("Fair Scheduler type is not yet supported");
                throw new IOException("Fair Scheduler is not yet supported");
            }
        }
        this.zkQuorum = conf.get("hbase.zookeeper.quorum");
        this.enableKeyTabSecurity();
    }

    private void enableKeyTabSecurity() throws IOException {
        String PRINCIPAL = "principal";
        String KEYTAB = "keyTab";
        String principal = null;
        String keyTabPath = null;
        AppConfigurationEntry[] entries = javax.security.auth.login.Configuration.getConfiguration().getAppConfigurationEntry("Client");
        LOGGER.info("Security - Fetched App Login Configuration Entries");
        if (entries != null) {
            for (AppConfigurationEntry entry : entries) {
                if (entry.getOptions().get("principal") != null) {
                    principal = (String)entry.getOptions().get("principal");
                }
                if (entry.getOptions().get("keyTab") == null) continue;
                keyTabPath = (String)entry.getOptions().get("keyTab");
            }
            LOGGER.info("Security - Got Principal = " + principal);
            if (principal != null && keyTabPath != null) {
                LOGGER.info("Security - Retreiving the TGT with principal:" + principal + " and keytab:" + keyTabPath);
                UserGroupInformation.loginUserFromKeytab((String)principal, keyTabPath);
                LOGGER.info("Security - Retrieved TGT with principal:" + principal + " and keytab:" + keyTabPath);
            }
        }
    }

    public Map<String, PhoenixAsyncIndex> getCandidateJobs() throws SQLException {
        Connection con = DriverManager.getConnection("jdbc:phoenix:" + this.zkQuorum);
        return this.getCandidateJobs(con);
    }

    public Map<String, PhoenixAsyncIndex> getCandidateJobs(Connection con) throws SQLException {
        Properties props = new Properties();
        UpgradeUtil.doNotUpgradeOnFirstConnection((Properties)props);
        HashMap<String, PhoenixAsyncIndex> candidateIndexes = new HashMap<String, PhoenixAsyncIndex>();
        try (Statement s = con.createStatement();
             ResultSet rs = s.executeQuery(CANDIDATE_INDEX_INFO_QUERY);){
            while (rs.next()) {
                PhoenixAsyncIndex indexInfo = new PhoenixAsyncIndex();
                indexInfo.setIndexType(PTable.IndexType.fromSerializedValue((byte)rs.getByte("INDEX_TYPE")));
                indexInfo.setDataTableName(rs.getString("DATA_TABLE_NAME"));
                indexInfo.setTableSchem(rs.getString("TABLE_SCHEM"));
                indexInfo.setTableName(rs.getString("TABLE_NAME"));
                candidateIndexes.put(String.format("PHOENIX_%s.%s_INDX_%s", indexInfo.getTableSchem(), indexInfo.getDataTableName(), indexInfo.getTableName()), indexInfo);
            }
        }
        return candidateIndexes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int scheduleIndexBuilds() throws Exception {
        ZKWatcher zookeeperWatcher = new ZKWatcher(this.conf, "phoenixAutomatedMRIndexBuild", null);
        if (!ZKBasedMasterElectionUtil.acquireLock(zookeeperWatcher, PHOENIX_LOCKS_PARENT, AUTO_INDEX_BUILD_LOCK_NAME)) {
            LOGGER.info("Some other node is already running Automated Index Build. Skipping execution!");
            return -1;
        }
        Map<String, PhoenixAsyncIndex> candidateJobs = this.getCandidateJobs();
        LOGGER.info("Candidate Indexes to be built as seen from SYSTEM.CATALOG - " + candidateJobs);
        Set<String> submittedJobs = this.getSubmittedYarnApps();
        LOGGER.info("Already Submitted/Running MR index build jobs - " + submittedJobs);
        Set<PhoenixAsyncIndex> jobsToSchedule = this.getJobsToSubmit(candidateJobs, submittedJobs);
        LOGGER.info("Final indexes to be built - " + jobsToSchedule);
        ArrayList<Future<Boolean>> results = new ArrayList<Future<Boolean>>(jobsToSchedule.size());
        int failedJobSubmissionCount = 0;
        int timedoutJobSubmissionCount = 0;
        ExecutorService jobSubmitPool = Executors.newFixedThreadPool(10);
        LOGGER.info("Attempt to submit MR index build jobs for - " + jobsToSchedule);
        try {
            for (PhoenixAsyncIndex phoenixAsyncIndex : jobsToSchedule) {
                PhoenixMRJobCallable task = new PhoenixMRJobCallable(HBaseConfiguration.create((Configuration)this.conf), phoenixAsyncIndex, "/");
                results.add(jobSubmitPool.submit(task));
            }
            for (Future future : results) {
                try {
                    future.get(5L, TimeUnit.MINUTES);
                }
                catch (InterruptedException e) {
                    ++failedJobSubmissionCount;
                }
                catch (ExecutionException e) {
                    ++failedJobSubmissionCount;
                }
                catch (TimeoutException e) {
                    ++timedoutJobSubmissionCount;
                }
            }
        }
        finally {
            PhoenixMRJobUtil.shutdown(jobSubmitPool);
        }
        LOGGER.info("Result of Attempt to Submit MR index build Jobs - Jobs attempted = " + jobsToSchedule.size() + " ; Failed to Submit = " + failedJobSubmissionCount + " ; Timed out = " + timedoutJobSubmissionCount);
        return failedJobSubmissionCount;
    }

    public Set<PhoenixAsyncIndex> getJobsToSubmit(Map<String, PhoenixAsyncIndex> candidateJobs, Set<String> submittedJobs) {
        HashSet<PhoenixAsyncIndex> toScheduleJobs = new HashSet<PhoenixAsyncIndex>(candidateJobs.values());
        for (String jobId : submittedJobs) {
            if (!candidateJobs.containsKey(jobId)) continue;
            toScheduleJobs.remove(candidateJobs.get(jobId));
        }
        return toScheduleJobs;
    }

    public Set<String> getSubmittedYarnApps() throws Exception {
        String rmAddress = PhoenixMRJobUtil.getActiveResourceManagerAddress(this.conf, this.zkQuorum);
        HashMap<String, String> urlParams = new HashMap<String, String>();
        urlParams.put("states", YarnApplication.state.NEW.toString() + "," + YarnApplication.state.ACCEPTED + "," + YarnApplication.state.SUBMITTED + "," + YarnApplication.state.RUNNING);
        String response = PhoenixMRJobUtil.getJobsInformationFromRM(rmAddress, urlParams);
        LOGGER.debug("Already Submitted/Running Apps = " + response);
        JsonNode jsonNode = JacksonUtil.getObjectReader().readTree(response);
        JsonNode appsJson = jsonNode.get("apps");
        HashSet<String> yarnApplicationSet = new HashSet<String>();
        if (appsJson == null) {
            return yarnApplicationSet;
        }
        JsonNode appJson = appsJson.get("app");
        if (appJson == null) {
            return yarnApplicationSet;
        }
        for (JsonNode clientVersion : appJson) {
            yarnApplicationSet.add(clientVersion.get("name").textValue());
        }
        return yarnApplicationSet;
    }

    public static void main(String[] args) throws Exception {
        PhoenixMRJobSubmitter t = new PhoenixMRJobSubmitter();
        t.scheduleIndexBuilds();
    }
}

