/*
 * Decompiled with CFR 0.152.
 */
package id.onyx.obdp.server.upgrade;

import com.google.inject.Inject;
import com.google.inject.Injector;
import id.onyx.obdp.server.OBDPException;
import id.onyx.obdp.server.controller.OBDPManagementController;
import id.onyx.obdp.server.orm.DBAccessor;
import id.onyx.obdp.server.state.Cluster;
import id.onyx.obdp.server.state.Clusters;
import id.onyx.obdp.server.state.Config;
import id.onyx.obdp.server.state.SecurityType;
import id.onyx.obdp.server.upgrade.AbstractUpgradeCatalog;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UpgradeCatalog251
extends AbstractUpgradeCatalog {
    static final String HOST_ROLE_COMMAND_TABLE = "host_role_command";
    static final String HRC_IS_BACKGROUND_COLUMN = "is_background";
    protected static final String KAFKA_BROKER_CONFIG = "kafka-broker";
    private static final String STAGE_TABLE = "stage";
    private static final String REQUEST_TABLE = "request";
    private static final String CLUSTER_HOST_INFO_COLUMN = "cluster_host_info";
    private static final String REQUEST_ID_COLUMN = "request_id";
    protected static final String STORM_ENV_CONFIG = "storm-env";
    private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog251.class);

    @Inject
    public UpgradeCatalog251(Injector injector) {
        super(injector);
    }

    @Override
    public String getSourceVersion() {
        return "2.5.0";
    }

    @Override
    public String getTargetVersion() {
        return "2.5.1";
    }

    @Override
    protected void executeDDLUpdates() throws OBDPException, SQLException {
        this.addBackgroundColumnToHostRoleCommand();
        this.moveClusterHostColumnFromStageToRequest();
    }

    @Override
    protected void executePreDMLUpdates() throws OBDPException, SQLException {
    }

    @Override
    protected void executeDMLUpdates() throws OBDPException, SQLException {
        this.addNewConfigurationsFromXml();
        this.updateKAFKAConfigs();
        this.updateSTORMConfigs();
    }

    protected void updateKAFKAConfigs() throws OBDPException {
        Map<String, Cluster> clusterMap;
        OBDPManagementController ambariManagementController = (OBDPManagementController)this.injector.getInstance(OBDPManagementController.class);
        Clusters clusters = ambariManagementController.getClusters();
        if (clusters != null && (clusterMap = this.getCheckedClusterMap(clusters)) != null && !clusterMap.isEmpty()) {
            for (Cluster cluster : clusterMap.values()) {
                String newListenersPropertyValue;
                String listenersPropertyValue;
                Config kafkaBroker;
                Set<String> installedServices = cluster.getServices().keySet();
                if (!installedServices.contains("KAFKA") || cluster.getSecurityType() != SecurityType.KERBEROS || (kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG)) == null || !StringUtils.isNotEmpty((String)(listenersPropertyValue = kafkaBroker.getProperties().get("listeners"))) || (newListenersPropertyValue = listenersPropertyValue.replaceAll("\\bPLAINTEXT\\b", "PLAINTEXTSASL")).equals(listenersPropertyValue)) continue;
                this.updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false);
            }
        }
    }

    private void addBackgroundColumnToHostRoleCommand() throws SQLException {
        this.dbAccessor.addColumn(HOST_ROLE_COMMAND_TABLE, new DBAccessor.DBColumnInfo(HRC_IS_BACKGROUND_COLUMN, Short.class, null, (Object)0, false));
    }

    private void moveClusterHostColumnFromStageToRequest() throws SQLException {
        DBAccessor.DBColumnInfo sourceColumn = new DBAccessor.DBColumnInfo(CLUSTER_HOST_INFO_COLUMN, byte[].class, null, null, false);
        DBAccessor.DBColumnInfo targetColumn = new DBAccessor.DBColumnInfo(CLUSTER_HOST_INFO_COLUMN, byte[].class, null, null, false);
        this.dbAccessor.moveColumnToAnotherTable(STAGE_TABLE, sourceColumn, REQUEST_ID_COLUMN, REQUEST_TABLE, targetColumn, REQUEST_ID_COLUMN, "{}".getBytes());
    }

    protected void updateSTORMConfigs() throws OBDPException {
        Map<String, Cluster> clusterMap;
        OBDPManagementController ambariManagementController = (OBDPManagementController)this.injector.getInstance(OBDPManagementController.class);
        Clusters clusters = ambariManagementController.getClusters();
        if (clusters != null && (clusterMap = this.getCheckedClusterMap(clusters)) != null && !clusterMap.isEmpty()) {
            for (Cluster cluster : clusterMap.values()) {
                Config stormEnv;
                Object content;
                Set<String> installedServices = cluster.getServices().keySet();
                if (!installedServices.contains("STORM") || (content = (stormEnv = cluster.getDesiredConfigByType(STORM_ENV_CONFIG)).getProperties().get("content")) == null || ((String)content).contains("STORM_AUTOCREDS_LIB_DIR")) continue;
                HashMap<String, String> newProperties = new HashMap<String, String>();
                String stormEnvConfigs = "\n# set storm-auto creds\n# check if storm_jaas.conf in config, only enable storm_auto_creds in secure mode.\nSTORM_JAAS_CONF=$STORM_HOME/conf/storm_jaas.conf\nSTORM_AUTOCREDS_LIB_DIR=$STORM_HOME/external/storm-autocreds\nif [ -f $STORM_JAAS_CONF ] && [ -d $STORM_AUTOCREDS_LIB_DIR ]; then\n  export STORM_EXT_CLASSPATH=$STORM_AUTOCREDS_LIB_DIR\nfi\n";
                content = (String)content + stormEnvConfigs;
                newProperties.put("content", (String)content);
                this.updateConfigurationPropertiesForCluster(cluster, STORM_ENV_CONFIG, newProperties, true, false);
            }
        }
    }
}

