/*
 * Decompiled with CFR 0.152.
 */
package id.onyx.obdp.server.topology;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Singleton;
import com.google.inject.persist.Transactional;
import id.onyx.obdp.server.OBDPException;
import id.onyx.obdp.server.actionmanager.HostRoleCommand;
import id.onyx.obdp.server.actionmanager.HostRoleStatus;
import id.onyx.obdp.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor;
import id.onyx.obdp.server.controller.OBDPServer;
import id.onyx.obdp.server.controller.RequestStatusResponse;
import id.onyx.obdp.server.controller.internal.ArtifactResourceProvider;
import id.onyx.obdp.server.controller.internal.BaseClusterRequest;
import id.onyx.obdp.server.controller.internal.CalculatedStatus;
import id.onyx.obdp.server.controller.internal.ProvisionClusterRequest;
import id.onyx.obdp.server.controller.internal.RequestImpl;
import id.onyx.obdp.server.controller.internal.ScaleClusterRequest;
import id.onyx.obdp.server.controller.internal.Stack;
import id.onyx.obdp.server.controller.spi.NoSuchParentResourceException;
import id.onyx.obdp.server.controller.spi.RequestStatus;
import id.onyx.obdp.server.controller.spi.Resource;
import id.onyx.obdp.server.controller.spi.ResourceAlreadyExistsException;
import id.onyx.obdp.server.controller.spi.ResourceProvider;
import id.onyx.obdp.server.controller.spi.SystemException;
import id.onyx.obdp.server.controller.spi.UnsupportedPropertyException;
import id.onyx.obdp.server.events.ClusterConfigFinishedEvent;
import id.onyx.obdp.server.events.ClusterProvisionStartedEvent;
import id.onyx.obdp.server.events.ClusterProvisionedEvent;
import id.onyx.obdp.server.events.HostsRemovedEvent;
import id.onyx.obdp.server.events.OBDPEvent;
import id.onyx.obdp.server.events.RequestFinishedEvent;
import id.onyx.obdp.server.events.publishers.OBDPEventPublisher;
import id.onyx.obdp.server.orm.dao.HostRoleCommandStatusSummaryDTO;
import id.onyx.obdp.server.orm.dao.SettingDAO;
import id.onyx.obdp.server.orm.entities.SettingEntity;
import id.onyx.obdp.server.orm.entities.StageEntity;
import id.onyx.obdp.server.security.authorization.AuthorizationHelper;
import id.onyx.obdp.server.serveraction.kerberos.KDCType;
import id.onyx.obdp.server.state.Host;
import id.onyx.obdp.server.state.SecurityType;
import id.onyx.obdp.server.state.host.HostImpl;
import id.onyx.obdp.server.topology.AmbariContext;
import id.onyx.obdp.server.topology.AsyncCallableService;
import id.onyx.obdp.server.topology.ClusterConfigurationRequest;
import id.onyx.obdp.server.topology.ClusterTopology;
import id.onyx.obdp.server.topology.ClusterTopologyImpl;
import id.onyx.obdp.server.topology.Configuration;
import id.onyx.obdp.server.topology.Credential;
import id.onyx.obdp.server.topology.HostGroup;
import id.onyx.obdp.server.topology.HostGroupInfo;
import id.onyx.obdp.server.topology.HostOfferResponse;
import id.onyx.obdp.server.topology.HostRequest;
import id.onyx.obdp.server.topology.InvalidTopologyException;
import id.onyx.obdp.server.topology.LogicalRequest;
import id.onyx.obdp.server.topology.LogicalRequestFactory;
import id.onyx.obdp.server.topology.NoSuchHostGroupException;
import id.onyx.obdp.server.topology.PersistedState;
import id.onyx.obdp.server.topology.PersistedTopologyRequest;
import id.onyx.obdp.server.topology.SecurityConfiguration;
import id.onyx.obdp.server.topology.SecurityConfigurationFactory;
import id.onyx.obdp.server.topology.TopologyRequest;
import id.onyx.obdp.server.topology.addservice.ResourceProviderAdapter;
import id.onyx.obdp.server.topology.tasks.ConfigureClusterTask;
import id.onyx.obdp.server.topology.tasks.ConfigureClusterTaskFactory;
import id.onyx.obdp.server.topology.validators.TopologyValidatorService;
import id.onyx.obdp.server.utils.ManagedThreadPoolExecutor;
import id.onyx.obdp.server.utils.RetryHelper;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class TopologyManager {
    public static final String INTERNAL_AUTH_TOKEN = "internal_topology_token";
    public static final String INITIAL_CONFIG_TAG = "INITIAL";
    public static final String TOPOLOGY_RESOLVED_TAG = "TOPOLOGY_RESOLVED";
    public static final String KDC_ADMIN_CREDENTIAL = "kdc.admin.credential";
    private PersistedState persistedState;
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private int topologyTaskExecutorThreadPoolSize;
    private final Map<Long, ManagedThreadPoolExecutor> topologyTaskExecutorServiceMap = new HashMap<Long, ManagedThreadPoolExecutor>();
    private Collection<String> hostsToIgnore = new HashSet<String>();
    private final List<HostImpl> availableHosts = new LinkedList<HostImpl>();
    private final Map<String, LogicalRequest> reservedHosts = new HashMap<String, LogicalRequest>();
    private final Map<Long, LogicalRequest> allRequests = new HashMap<Long, LogicalRequest>();
    private final Collection<LogicalRequest> outstandingRequests = new ArrayList<LogicalRequest>();
    private Map<Long, ClusterTopology> clusterTopologyMap = new HashMap<Long, ClusterTopology>();
    @Inject
    private StackAdvisorBlueprintProcessor stackAdvisorBlueprintProcessor;
    @Inject
    private LogicalRequestFactory logicalRequestFactory;
    @Inject
    private AmbariContext ambariContext;
    private final Object initializationLock = new Object();
    @Inject
    private SecurityConfigurationFactory securityConfigurationFactory;
    @Inject
    private ConfigureClusterTaskFactory configureClusterTaskFactory;
    @Inject
    private OBDPEventPublisher ambariEventPublisher;
    @Inject
    private SettingDAO settingDAO;
    @Inject
    private TopologyValidatorService topologyValidatorService;
    private volatile boolean isInitialized;
    private static final Logger LOG = LoggerFactory.getLogger(TopologyManager.class);
    private Map<Long, LogicalRequest> clusterProvisionWithBlueprintCreateRequests = new HashMap<Long, LogicalRequest>();
    private Map<Long, Boolean> clusterProvisionWithBlueprintCreationFinished = new HashMap<Long, Boolean>();

    public TopologyManager() {
        this.topologyTaskExecutorThreadPoolSize = 1;
    }

    @Inject
    public TopologyManager(id.onyx.obdp.server.configuration.Configuration configuration) {
        this.topologyTaskExecutorThreadPoolSize = configuration.getParallelTopologyTaskCreationThreadPoolSize();
        if (!configuration.isParallelTopologyTaskCreationEnabled()) {
            this.topologyTaskExecutorThreadPoolSize = 1;
        }
    }

    @Inject
    private void register() {
        this.ambariEventPublisher.register(this);
    }

    @Inject
    private void setPersistedState() {
        this.persistedState = this.ambariContext.getPersistedTopologyState();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensureInitialized() {
        if (!this.isInitialized) {
            Object object = this.initializationLock;
            synchronized (object) {
                if (!this.isInitialized) {
                    this.replayRequests(this.persistedState.getAllRequests());
                    for (ClusterTopology clusterTopology : this.clusterTopologyMap.values()) {
                        if (!clusterTopology.isClusterKerberosEnabled() || !this.isKerberosClientInstallAllowed(clusterTopology)) continue;
                        this.addKerberosClient(clusterTopology);
                    }
                    this.isInitialized = true;
                }
            }
        }
    }

    @Subscribe
    public void onRequestFinished(RequestFinishedEvent event) {
        if (event.getType() != OBDPEvent.OBDPEventType.REQUEST_FINISHED || this.clusterProvisionWithBlueprintCreateRequests.isEmpty() || Boolean.TRUE.equals(this.clusterProvisionWithBlueprintCreationFinished.get(event.getClusterId()))) {
            return;
        }
        if (this.isClusterProvisionWithBlueprintFinished(event.getClusterId())) {
            this.clusterProvisionWithBlueprintCreationFinished.put(event.getClusterId(), Boolean.TRUE);
            LogicalRequest provisionRequest = this.clusterProvisionWithBlueprintCreateRequests.get(event.getClusterId());
            if (this.isLogicalRequestSuccessful(provisionRequest)) {
                LOG.info("Cluster creation request id={} using Blueprint {} successfully completed for cluster id={}", new Object[]{this.clusterProvisionWithBlueprintCreateRequests.get(event.getClusterId()).getRequestId(), this.clusterTopologyMap.get(event.getClusterId()).getBlueprint().getName(), event.getClusterId()});
                this.ambariEventPublisher.publish(new ClusterProvisionedEvent(event.getClusterId()));
            } else {
                LOG.info("Cluster creation request id={} using Blueprint {} failed for cluster id={}", new Object[]{this.clusterProvisionWithBlueprintCreateRequests.get(event.getClusterId()).getRequestId(), this.clusterTopologyMap.get(event.getClusterId()).getBlueprint().getName(), event.getClusterId()});
            }
        }
    }

    public boolean isClusterProvisionWithBlueprintTracked(long clusterId) {
        return this.clusterProvisionWithBlueprintCreateRequests.containsKey(clusterId);
    }

    public boolean isClusterProvisionWithBlueprintFinished(long clusterId) {
        if (!this.isClusterProvisionWithBlueprintTracked(clusterId)) {
            return false;
        }
        if (this.clusterProvisionWithBlueprintCreationFinished.containsKey(clusterId) && this.clusterProvisionWithBlueprintCreationFinished.get(clusterId).booleanValue()) {
            return true;
        }
        return this.isLogicalRequestFinished(this.clusterProvisionWithBlueprintCreateRequests.get(clusterId));
    }

    public RequestStatusResponse provisionCluster(final ProvisionClusterRequest request) throws InvalidTopologyException, OBDPException {
        this.ensureInitialized();
        final ClusterTopologyImpl topology = new ClusterTopologyImpl(this.ambariContext, request);
        String clusterName = request.getClusterName();
        Stack stack = topology.getBlueprint().getStack();
        String repoVersion = request.getRepositoryVersion();
        Long repoVersionID = request.getRepositoryVersionId();
        final Long provisionId = this.ambariContext.getNextRequestId();
        SecurityType securityType = null;
        Credential credential = null;
        SecurityConfiguration securityConfiguration = this.processSecurityConfiguration(request);
        if (securityConfiguration != null && securityConfiguration.getType() == SecurityType.KERBEROS) {
            securityType = SecurityType.KERBEROS;
            if (this.isKerberosClientInstallAllowed(topology)) {
                this.addKerberosClient(topology);
            }
            topology.getBlueprint().getConfiguration().setParentConfiguration(stack.getConfiguration(topology.getBlueprint().getServices()));
            credential = request.getCredentialsMap().get(KDC_ADMIN_CREDENTIAL);
            if (credential == null) {
                throw new InvalidTopologyException("kdc.admin.credential is missing from request.");
            }
        }
        this.topologyValidatorService.validateTopologyConfiguration(topology);
        this.ambariContext.createAmbariResources(topology, clusterName, securityType, repoVersion, repoVersionID);
        if (securityConfiguration != null) {
            securityConfiguration.getDescriptor().ifPresent(descriptor -> this.submitKerberosDescriptorAsArtifact(clusterName, (Map<?, ?>)descriptor));
        }
        if (credential != null) {
            TopologyManager.submitCredential(clusterName, credential);
        }
        long clusterId = this.ambariContext.getClusterId(clusterName);
        topology.setClusterId(clusterId);
        request.setClusterId(clusterId);
        topology.setConfigRecommendationStrategy(request.getConfigRecommendationStrategy());
        topology.setProvisionAction(request.getProvisionAction());
        this.getOrCreateTopologyTaskExecutor(clusterId);
        LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>(){

            @Override
            public LogicalRequest call() throws Exception {
                LogicalRequest logicalRequest = TopologyManager.this.processAndPersistProvisionClusterTopologyRequest(request, topology, provisionId);
                return logicalRequest;
            }
        });
        this.clusterTopologyMap.put(clusterId, topology);
        this.addClusterConfigRequest(logicalRequest, topology, new ClusterConfigurationRequest(this.ambariContext, topology, true, this.stackAdvisorBlueprintProcessor, securityType == SecurityType.KERBEROS));
        this.processRequest(request, topology, logicalRequest);
        this.ambariContext.persistInstallStateForUI(clusterName, stack.getName(), stack.getVersion());
        this.clusterProvisionWithBlueprintCreateRequests.put(clusterId, logicalRequest);
        this.ambariEventPublisher.publish(new ClusterProvisionStartedEvent(clusterId));
        return this.getRequestStatus(logicalRequest.getRequestId());
    }

    private boolean isKerberosClientInstallAllowed(ClusterTopology topology) {
        Configuration topologyConfig = topology.getBlueprint().getConfiguration();
        String kdc_type = topologyConfig.getPropertyValue("kerberos-env", "kdc_type");
        String manage_identities = topologyConfig.getPropertyValue("kerberos-env", "manage_identities");
        return KDCType.NONE != KDCType.translate(kdc_type) || Boolean.parseBoolean(manage_identities);
    }

    @Subscribe
    public void onClusterConfigFinishedEvent(ClusterConfigFinishedEvent event) {
        ManagedThreadPoolExecutor taskExecutor = this.topologyTaskExecutorServiceMap.get(event.getClusterId());
        if (taskExecutor == null) {
            LOG.error("Can't find executor service taskQueue not found for cluster: {} ", (Object)event.getClusterName());
        } else {
            LOG.info("Starting topology task ExecutorService for cluster: {}", (Object)event.getClusterName());
            taskExecutor.start();
        }
    }

    void saveOrUpdateQuickLinksProfile(String quickLinksProfileJson) {
        SettingEntity settingEntity = this.settingDAO.findByName("QuickLinksProfile");
        if (null == settingEntity) {
            settingEntity = new SettingEntity();
            settingEntity.setName("QuickLinksProfile");
            settingEntity.setSettingType("obdp-server");
            settingEntity.setContent(quickLinksProfileJson);
            settingEntity.setUpdatedBy(AuthorizationHelper.getAuthenticatedName());
            settingEntity.setUpdateTimestamp(System.currentTimeMillis());
            this.settingDAO.create(settingEntity);
        } else {
            settingEntity.setContent(quickLinksProfileJson);
            settingEntity.setUpdatedBy(AuthorizationHelper.getAuthenticatedName());
            settingEntity.setUpdateTimestamp(System.currentTimeMillis());
            this.settingDAO.merge(settingEntity);
        }
    }

    private static void submitCredential(String clusterName, Credential credential) {
        ResourceProvider provider = AmbariContext.getClusterController().ensureResourceProvider(Resource.Type.Credential);
        Map<String, Object> credentialProperties = ResourceProviderAdapter.createCredentialRequestProperties(clusterName, credential);
        RequestImpl request = new RequestImpl((Set<String>)ImmutableSet.of(), (Set<Map<String, Object>>)ImmutableSet.of(credentialProperties), (Map<String, String>)ImmutableMap.of(), null);
        String baseMessage = String.format("Failed to add credential %s to cluster %s", credential.getAlias(), clusterName);
        try {
            RequestStatus status = provider.createResources(request);
            if (status.getStatus() != RequestStatus.Status.Complete) {
                String msg = String.format("%s, received status: %s", new Object[]{baseMessage, status.getStatus()});
                LOG.error(msg);
                throw new RuntimeException(msg);
            }
        }
        catch (NoSuchParentResourceException | ResourceAlreadyExistsException | SystemException | UnsupportedPropertyException e) {
            String msg = String.format("%s, %s", baseMessage, e);
            LOG.error(msg);
            throw new RuntimeException(msg, e);
        }
    }

    private SecurityConfiguration processSecurityConfiguration(ProvisionClusterRequest request) {
        LOG.debug("Getting security configuration from the request ...");
        SecurityConfiguration securityConfiguration = request.getSecurityConfiguration();
        if (securityConfiguration == null) {
            LOG.debug("There's no security configuration in the request, retrieving it from the associated blueprint");
            securityConfiguration = request.getBlueprint().getSecurity();
            if (securityConfiguration != null && securityConfiguration.getType() == SecurityType.KERBEROS && securityConfiguration.getDescriptorReference() != null) {
                securityConfiguration = this.securityConfigurationFactory.loadSecurityConfigurationByReference(securityConfiguration.getDescriptorReference());
            }
        }
        return securityConfiguration;
    }

    private void submitKerberosDescriptorAsArtifact(String clusterName, Map<?, ?> descriptor) {
        TopologyManager topologyManager = this;
        ResourceProvider artifactProvider = topologyManager.ambariContext.getClusterController().ensureResourceProvider(Resource.Type.Artifact);
        Map<String, Object> properties = ResourceProviderAdapter.createKerberosDescriptorRequestProperties(clusterName);
        ImmutableMap requestInfoProps = ImmutableMap.of((Object)"RAW_REQUEST_BODY", (Object)ArtifactResourceProvider.toArtifactDataJson(descriptor));
        RequestImpl request = new RequestImpl(Collections.emptySet(), Collections.singleton(properties), (Map<String, String>)requestInfoProps, null);
        try {
            RequestStatus status = artifactProvider.createResources(request);
            try {
                while (status.getStatus() != RequestStatus.Status.Complete) {
                    LOG.info("Waiting for kerberos_descriptor artifact creation.");
                    Thread.sleep(100L);
                }
            }
            catch (InterruptedException e) {
                LOG.info("Wait for resource creation interrupted!");
            }
            if (status.getStatus() != RequestStatus.Status.Complete) {
                throw new RuntimeException("Failed to attach kerberos_descriptor artifact to cluster!");
            }
        }
        catch (NoSuchParentResourceException | SystemException | UnsupportedPropertyException e) {
            throw new RuntimeException("Failed to attach kerberos_descriptor artifact to cluster: " + e);
        }
        catch (ResourceAlreadyExistsException e) {
            throw new RuntimeException("Failed to attach kerberos_descriptor artifact to cluster as resource already exists.");
        }
    }

    public RequestStatusResponse scaleHosts(final ScaleClusterRequest request) throws InvalidTopologyException, OBDPException {
        this.ensureInitialized();
        LOG.info("TopologyManager.scaleHosts: Entering");
        String clusterName = request.getClusterName();
        long clusterId = this.ambariContext.getClusterId(clusterName);
        final ClusterTopology topology = this.clusterTopologyMap.get(clusterId);
        if (topology == null) {
            throw new InvalidTopologyException("Unable to retrieve cluster topology for cluster. This is most likely a result of trying to scale a cluster via the API which was created using the Ambari UI. At this time only clusters created via the API using a blueprint can be scaled with this API.  If the cluster was originally created via the API as described above, please file a Jira for this matter.");
        }
        this.hostNameCheck(request, topology);
        request.setClusterId(clusterId);
        if (this.ambariContext.isTopologyResolved(clusterId)) {
            this.getOrCreateTopologyTaskExecutor(clusterId).start();
        }
        topology.update(request);
        final Long requestId = this.ambariContext.getNextRequestId();
        LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>(){

            @Override
            public LogicalRequest call() throws Exception {
                LogicalRequest logicalRequest = TopologyManager.this.processAndPersistTopologyRequest(request, topology, requestId);
                return logicalRequest;
            }
        });
        this.processRequest(request, topology, logicalRequest);
        return this.getRequestStatus(logicalRequest.getRequestId());
    }

    public void removePendingHostRequests(String clusterName, long requestId) {
        this.ensureInitialized();
        LOG.info("TopologyManager.removePendingHostRequests: Entering");
        long clusterId = 0L;
        try {
            clusterId = this.ambariContext.getClusterId(clusterName);
        }
        catch (OBDPException e) {
            LOG.error("Unable to retrieve clusterId", (Throwable)e);
            throw new IllegalArgumentException("Unable to retrieve clusterId");
        }
        ClusterTopology topology = this.clusterTopologyMap.get(clusterId);
        if (topology == null) {
            throw new IllegalArgumentException("Unable to retrieve cluster topology for cluster");
        }
        LogicalRequest logicalRequest = this.allRequests.get(requestId);
        if (logicalRequest == null) {
            throw new IllegalArgumentException("No Logical Request found for requestId: " + requestId);
        }
        Collection<HostRequest> pendingHostRequests = logicalRequest.removePendingHostRequests(null);
        if (!logicalRequest.hasPendingHostRequests()) {
            this.outstandingRequests.remove(logicalRequest);
        }
        if (logicalRequest.getHostRequests().isEmpty()) {
            this.allRequests.remove(requestId);
        }
        this.persistedState.removeHostRequests(requestId, pendingHostRequests);
        for (HostGroupInfo currentHostGroupInfo : topology.getHostGroupInfo().values()) {
            currentHostGroupInfo.setRequestedCount(currentHostGroupInfo.getHostNames().size());
        }
        LOG.info("TopologyManager.removePendingHostRequests: Exit");
    }

    public void removeHostRequests(String hostName) {
        this.ensureInitialized();
        Iterator<LogicalRequest> iter = this.allRequests.values().iterator();
        while (iter.hasNext()) {
            LogicalRequest logicalRequest = iter.next();
            Set<HostRequest> removed = logicalRequest.removeHostRequestByHostName(hostName);
            if (!logicalRequest.hasPendingHostRequests()) {
                this.outstandingRequests.remove(logicalRequest);
            }
            if (logicalRequest.getHostRequests().isEmpty()) {
                iter.remove();
            }
            if (removed.isEmpty()) continue;
            this.persistedState.removeHostRequests(logicalRequest.getRequestId(), removed);
        }
    }

    @Transactional
    protected LogicalRequest processAndPersistProvisionClusterTopologyRequest(ProvisionClusterRequest request, ClusterTopology topology, Long logicalRequestId) throws InvalidTopologyException, OBDPException {
        if (null != request.getQuickLinksProfileJson()) {
            this.saveOrUpdateQuickLinksProfile(request.getQuickLinksProfileJson());
        }
        LogicalRequest logicalRequest = this.processAndPersistTopologyRequest(request, topology, logicalRequestId);
        return logicalRequest;
    }

    @Transactional
    protected LogicalRequest processAndPersistTopologyRequest(BaseClusterRequest request, ClusterTopology topology, Long logicalRequestId) throws InvalidTopologyException, OBDPException {
        PersistedTopologyRequest persistedRequest = this.persistedState.persistTopologyRequest(request);
        LogicalRequest logicalRequest = this.createLogicalRequest(persistedRequest, topology, logicalRequestId);
        return logicalRequest;
    }

    private void hostNameCheck(ScaleClusterRequest request, ClusterTopology topology) throws InvalidTopologyException {
        HashSet<String> hostNames = new HashSet<String>();
        for (Map.Entry<String, HostGroupInfo> entry : request.getHostGroupInfo().entrySet()) {
            hostNames.addAll(entry.getValue().getHostNames());
        }
        for (String hostName : hostNames) {
            if (topology.getHostGroupForHost(hostName) == null) continue;
            throw new InvalidTopologyException("Host " + hostName + " cannot be added, because it is already in the cluster");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onHostRegistered(HostImpl host, boolean associatedWithCluster) {
        this.ensureInitialized();
        LOG.info("TopologyManager.onHostRegistered: Entering");
        if (associatedWithCluster || this.isHostIgnored(host.getHostName())) {
            LOG.info("TopologyManager.onHostRegistered: host = {} is already associated with the cluster or is currently being processed", (Object)host.getHostName());
            return;
        }
        boolean matchedToRequest = false;
        String hostName = host.getHostName();
        List<HostImpl> list = this.availableHosts;
        synchronized (list) {
            Object object = this.reservedHosts;
            synchronized (object) {
                if (this.reservedHosts.containsKey(hostName)) {
                    LogicalRequest request = this.reservedHosts.remove(hostName);
                    HostOfferResponse response = request.offer(host);
                    if (response.getAnswer() != HostOfferResponse.Answer.ACCEPTED) {
                        throw new RuntimeException("LogicalRequest declined host offer of explicitly requested host: " + hostName);
                    }
                    LOG.info("TopologyManager.onHostRegistered: processing accepted host offer for reserved host = {}", (Object)hostName);
                    this.processAcceptedHostOffer(this.getClusterTopology(request.getClusterId()), response, host);
                    matchedToRequest = true;
                }
            }
            if (!matchedToRequest) {
                object = this.outstandingRequests;
                synchronized (object) {
                    Iterator<LogicalRequest> outstandingRequestIterator = this.outstandingRequests.iterator();
                    while (!matchedToRequest && outstandingRequestIterator.hasNext()) {
                        LogicalRequest request = outstandingRequestIterator.next();
                        HostOfferResponse hostOfferResponse = request.offer(host);
                        switch (hostOfferResponse.getAnswer()) {
                            case ACCEPTED: {
                                matchedToRequest = true;
                                LOG.info("TopologyManager.onHostRegistered: processing accepted host offer for matched host = {}", (Object)hostName);
                                this.processAcceptedHostOffer(this.getClusterTopology(request.getClusterId()), hostOfferResponse, host);
                                break;
                            }
                            case DECLINED_DONE: {
                                LOG.info("TopologyManager.onHostRegistered: DECLINED_DONE received for host = {}", (Object)hostName);
                                outstandingRequestIterator.remove();
                                break;
                            }
                            case DECLINED_PREDICATE: {
                                LOG.info("TopologyManager.onHostRegistered: DECLINED_PREDICATE received for host = {}", (Object)hostName);
                            }
                        }
                    }
                }
            }
            if (!matchedToRequest) {
                boolean addToAvailableList = true;
                for (HostImpl registered : this.availableHosts) {
                    if (!Objects.equals(registered.getHostId(), host.getHostId())) continue;
                    LOG.info("Host {} re-registered, will not be added to the available hosts list", (Object)hostName);
                    addToAvailableList = false;
                    break;
                }
                if (addToAvailableList) {
                    LOG.info("TopologyManager: Queueing available host {}", (Object)hostName);
                    this.availableHosts.add(host);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onHostHeartBeatLost(Host host) {
        if (OBDPServer.getController() == null) {
            return;
        }
        this.ensureInitialized();
        List<HostImpl> list = this.availableHosts;
        synchronized (list) {
            LOG.info("Hearbeat for host {} lost thus removing it from available hosts.", (Object)host.getHostName());
            this.availableHosts.remove(host);
        }
    }

    public LogicalRequest getRequest(long requestId) {
        this.ensureInitialized();
        return this.allRequests.get(requestId);
    }

    public Collection<LogicalRequest> getRequests(Collection<Long> requestIds) {
        this.ensureInitialized();
        if (requestIds.isEmpty()) {
            return this.allRequests.values();
        }
        ArrayList<LogicalRequest> matchingRequests = new ArrayList<LogicalRequest>();
        for (long id : requestIds) {
            LogicalRequest request = this.allRequests.get(id);
            if (request == null) continue;
            matchingRequests.add(request);
        }
        return matchingRequests;
    }

    public Collection<StageEntity> getStages() {
        this.ensureInitialized();
        ArrayList<StageEntity> stages = new ArrayList<StageEntity>();
        for (LogicalRequest logicalRequest : this.allRequests.values()) {
            stages.addAll(logicalRequest.getStageEntities());
        }
        return stages;
    }

    public Collection<HostRoleCommand> getTasks(long requestId) {
        this.ensureInitialized();
        LogicalRequest request = this.allRequests.get(requestId);
        return request == null ? Collections.emptyList() : request.getCommands();
    }

    public Collection<HostRoleCommand> getTasks(Collection<Long> requestIds) {
        this.ensureInitialized();
        ArrayList<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
        for (long id : requestIds) {
            tasks.addAll(this.getTasks(id));
        }
        return tasks;
    }

    public Map<Long, HostRoleCommandStatusSummaryDTO> getStageSummaries(Long requestId) {
        this.ensureInitialized();
        LogicalRequest request = this.allRequests.get(requestId);
        return request == null ? Collections.emptyMap() : request.getStageSummaries();
    }

    public RequestStatusResponse getRequestStatus(long requestId) {
        this.ensureInitialized();
        LogicalRequest request = this.allRequests.get(requestId);
        return request == null ? null : request.getRequestStatus();
    }

    public Collection<RequestStatusResponse> getRequestStatus(Collection<Long> ids) {
        this.ensureInitialized();
        ArrayList<RequestStatusResponse> requestStatusResponses = new ArrayList<RequestStatusResponse>();
        for (long id : ids) {
            RequestStatusResponse response = this.getRequestStatus(id);
            if (response == null) continue;
            requestStatusResponses.add(response);
        }
        return requestStatusResponses;
    }

    public ClusterTopology getClusterTopology(Long clusterId) {
        this.ensureInitialized();
        return this.clusterTopologyMap.get(clusterId);
    }

    public Map<String, Collection<String>> getPendingHostComponents() {
        this.ensureInitialized();
        HashMap<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>();
        for (LogicalRequest logicalRequest : this.allRequests.values()) {
            Map<Long, HostRoleCommandStatusSummaryDTO> summary = logicalRequest.getStageSummaries();
            CalculatedStatus status = CalculatedStatus.statusFromStageSummary(summary, summary.keySet());
            boolean logicalRequestInProgress = false;
            if (status.getStatus().isInProgress() || summary.isEmpty() && logicalRequest.getEndTime() <= 0L) {
                logicalRequestInProgress = true;
            }
            if (!logicalRequestInProgress) continue;
            Map<String, Collection<String>> requestTopology = logicalRequest.getProjectedTopology();
            for (Map.Entry<String, Collection<String>> entry : requestTopology.entrySet()) {
                String host = entry.getKey();
                HashSet<String> hostComponents = (HashSet<String>)hostComponentMap.get(host);
                if (hostComponents == null) {
                    hostComponents = new HashSet<String>();
                    hostComponentMap.put(host, hostComponents);
                }
                hostComponents.addAll(entry.getValue());
            }
        }
        return hostComponentMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processRequest(TopologyRequest request, ClusterTopology topology, LogicalRequest logicalRequest) throws OBDPException {
        LOG.info("TopologyManager.processRequest: Entering");
        this.finalizeTopology(request, topology);
        boolean requestHostComplete = false;
        List<HostImpl> list = this.availableHosts;
        synchronized (list) {
            Iterator<HostImpl> hostIterator = this.availableHosts.iterator();
            while (!requestHostComplete && hostIterator.hasNext()) {
                HostImpl host = hostIterator.next();
                Map<String, LogicalRequest> map = this.reservedHosts;
                synchronized (map) {
                    String hostname = host.getHostName();
                    if (this.reservedHosts.containsKey(hostname)) {
                        if (logicalRequest.equals(this.reservedHosts.get(hostname))) {
                            LOG.info("TopologyManager.processRequest: host name = {} is mapped to LogicalRequest ID = {} and will be removed from the reserved hosts.", (Object)hostname, (Object)logicalRequest.getRequestId());
                            this.reservedHosts.remove(hostname);
                        } else {
                            LOG.info("TopologyManager.processRequest: host name = {} is registered with another request, and will not be offered to LogicalRequest ID = {}", (Object)hostname, (Object)logicalRequest.getRequestId());
                            continue;
                        }
                    }
                }
                LOG.info("TopologyManager.processRequest: offering host name = {} to LogicalRequest ID = {}", (Object)host.getHostName(), (Object)logicalRequest.getRequestId());
                HostOfferResponse response = logicalRequest.offer(host);
                switch (response.getAnswer()) {
                    case ACCEPTED: {
                        hostIterator.remove();
                        LOG.info("TopologyManager.processRequest: host name = {} was ACCEPTED by LogicalRequest ID = {} , host has been removed from available hosts.", (Object)host.getHostName(), (Object)logicalRequest.getRequestId());
                        this.processAcceptedHostOffer(this.getClusterTopology(logicalRequest.getClusterId()), response, host);
                        break;
                    }
                    case DECLINED_DONE: {
                        requestHostComplete = true;
                        LOG.info("TopologyManager.processRequest: host name = {} was DECLINED_DONE by LogicalRequest ID = {}", (Object)host.getHostName(), (Object)logicalRequest.getRequestId());
                        break;
                    }
                    case DECLINED_PREDICATE: {
                        LOG.info("TopologyManager.processRequest: host name = {} was DECLINED_PREDICATE by LogicalRequest ID = {}", (Object)host.getHostName(), (Object)logicalRequest.getRequestId());
                    }
                }
            }
            if (!requestHostComplete) {
                LOG.info("TopologyManager.processRequest: not all required hosts have been matched, so adding LogicalRequest ID = {} to outstanding requests", (Object)logicalRequest.getRequestId());
                Collection<LogicalRequest> collection = this.outstandingRequests;
                synchronized (collection) {
                    this.outstandingRequests.add(logicalRequest);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Transactional
    protected LogicalRequest createLogicalRequest(PersistedTopologyRequest request, ClusterTopology topology, Long requestId) throws OBDPException {
        LogicalRequest logicalRequest = this.logicalRequestFactory.createRequest(requestId, request.getRequest(), topology);
        this.persistedState.persistLogicalRequest(logicalRequest, request.getId());
        this.allRequests.put(logicalRequest.getRequestId(), logicalRequest);
        LOG.info("TopologyManager.createLogicalRequest: created LogicalRequest with ID = {} and completed persistence of this request.", (Object)logicalRequest.getRequestId());
        Map<String, LogicalRequest> map = this.reservedHosts;
        synchronized (map) {
            for (String host : logicalRequest.getReservedHosts()) {
                this.reservedHosts.put(host, logicalRequest);
            }
        }
        return logicalRequest;
    }

    private void processAcceptedHostOffer(ClusterTopology topology, final HostOfferResponse response, final HostImpl host) {
        String hostName = host.getHostName();
        try {
            topology.addHostToTopology(response.getHostGroupName(), hostName);
            this.updateHostWithRackInfo(topology, response, host);
        }
        catch (InvalidTopologyException | NoSuchHostGroupException e) {
            throw new RuntimeException("An internal error occurred while performing request host registration: " + e, e);
        }
        try {
            RetryHelper.executeWithRetry(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    TopologyManager.this.persistTopologyHostRegistration(response.getHostRequestId(), host);
                    return null;
                }
            });
        }
        catch (OBDPException e) {
            LOG.error("Exception ocurred while registering host name", (Throwable)e);
            throw new RuntimeException(e);
        }
        LOG.info("TopologyManager.processAcceptedHostOffer: queue tasks for host = {} which responded {}", (Object)hostName, (Object)response.getAnswer());
        this.queueHostTasks(topology, response, hostName);
    }

    @Transactional
    protected void persistTopologyHostRegistration(long hostRequestId, HostImpl host) {
        this.persistedState.registerHostName(hostRequestId, host.getHostName());
        this.persistedState.registerInTopologyHostInfo(host);
    }

    private ManagedThreadPoolExecutor getOrCreateTopologyTaskExecutor(Long clusterId) {
        ManagedThreadPoolExecutor topologyTaskExecutor = this.topologyTaskExecutorServiceMap.get(clusterId);
        if (topologyTaskExecutor == null) {
            LOG.info("Creating TopologyTaskExecutorService for clusterId: {}", (Object)clusterId);
            topologyTaskExecutor = new ManagedThreadPoolExecutor(this.topologyTaskExecutorThreadPoolSize, this.topologyTaskExecutorThreadPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
            this.topologyTaskExecutorServiceMap.put(clusterId, topologyTaskExecutor);
        }
        return topologyTaskExecutor;
    }

    private void queueHostTasks(ClusterTopology topology, HostOfferResponse response, String hostName) {
        LOG.info("TopologyManager.processAcceptedHostOffer: queueing tasks for host = {}", (Object)hostName);
        ManagedThreadPoolExecutor executorService = this.getOrCreateTopologyTaskExecutor(topology.getClusterId());
        response.executeTasks(executorService, hostName, topology, this.ambariContext);
    }

    private void updateHostWithRackInfo(ClusterTopology topology, HostOfferResponse response, HostImpl host) {
        String rackInfoFromTemplate = topology.getHostGroupInfo().get(response.getHostGroupName()).getHostRackInfo().get(host.getHostName());
        if (null != rackInfoFromTemplate) {
            host.setRackInfo(rackInfoFromTemplate);
            try {
                TopologyManager topologyManager = this;
                topologyManager.ambariContext.getController().registerRackChange(this.ambariContext.getClusterName(topology.getClusterId()));
            }
            catch (OBDPException e) {
                LOG.error("Could not register rack change for cluster id {}", (Object)topology.getClusterId());
                LOG.error("Exception during rack change: ", (Throwable)e);
            }
        }
    }

    private void replayRequests(Map<ClusterTopology, List<LogicalRequest>> persistedRequests) {
        LOG.info("TopologyManager.replayRequests: Entering");
        boolean configChecked = false;
        for (Map.Entry<ClusterTopology, List<LogicalRequest>> requestEntry : persistedRequests.entrySet()) {
            ClusterTopology topology = requestEntry.getKey();
            this.clusterTopologyMap.put(topology.getClusterId(), topology);
            LogicalRequest provisionRequest = this.persistedState.getProvisionRequest(topology.getClusterId());
            if (provisionRequest != null) {
                this.clusterProvisionWithBlueprintCreateRequests.put(topology.getClusterId(), provisionRequest);
                this.clusterProvisionWithBlueprintCreationFinished.put(topology.getClusterId(), this.isLogicalRequestFinished(this.clusterProvisionWithBlueprintCreateRequests.get(topology.getClusterId())));
            }
            for (LogicalRequest logicalRequest : requestEntry.getValue()) {
                this.allRequests.put(logicalRequest.getRequestId(), logicalRequest);
                if (!logicalRequest.hasPendingHostRequests()) continue;
                this.outstandingRequests.add(logicalRequest);
                for (String reservedHost : logicalRequest.getReservedHosts()) {
                    this.reservedHosts.put(reservedHost, logicalRequest);
                }
                for (HostRequest hostRequest : logicalRequest.getCompletedHostRequests()) {
                    try {
                        String hostName = hostRequest.getHostName();
                        topology.addHostToTopology(hostRequest.getHostgroupName(), hostName);
                        this.hostsToIgnore.add(hostName);
                        LOG.info("TopologyManager.replayRequests: host name = {} has been added to cluster and to ignore list.", (Object)hostName);
                    }
                    catch (InvalidTopologyException e) {
                        LOG.warn("Attempted to add host to multiple host groups while replaying requests: " + e, (Throwable)e);
                    }
                    catch (NoSuchHostGroupException e) {
                        LOG.warn("Failed to add host to topology while replaying requests: " + e, (Throwable)e);
                    }
                }
            }
            if (configChecked) continue;
            configChecked = true;
            if (!this.ambariContext.isTopologyResolved(topology.getClusterId())) {
                if (provisionRequest == null) {
                    LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, but provision request missing, skipping cluster config request");
                    continue;
                }
                if (provisionRequest.isFinished()) {
                    LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, but provision request is finished, skipping cluster config request");
                    continue;
                }
                LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, adding cluster config request");
                ClusterConfigurationRequest configRequest = new ClusterConfigurationRequest(this.ambariContext, topology, false, this.stackAdvisorBlueprintProcessor);
                this.addClusterConfigRequest(provisionRequest, topology, configRequest);
                continue;
            }
            this.getOrCreateTopologyTaskExecutor(topology.getClusterId()).start();
        }
        LOG.info("TopologyManager.replayRequests: Exit");
    }

    private boolean isLogicalRequestFinished(LogicalRequest logicalRequest) {
        return logicalRequest != null && logicalRequest.isFinished();
    }

    private boolean isLogicalRequestSuccessful(LogicalRequest logicalRequest) {
        return logicalRequest != null && logicalRequest.isSuccessful();
    }

    private void finalizeTopology(TopologyRequest request, ClusterTopology topology) {
    }

    private boolean isHostIgnored(String host) {
        return this.hostsToIgnore.remove(host);
    }

    private void addKerberosClient(ClusterTopology topology) {
        for (HostGroup group : topology.getBlueprint().getHostGroups().values()) {
            group.addComponent("KERBEROS_CLIENT");
        }
    }

    private void addClusterConfigRequest(LogicalRequest logicalRequest, ClusterTopology topology, ClusterConfigurationRequest configurationRequest) {
        ConfigureClusterTask task = this.configureClusterTaskFactory.createConfigureClusterTask(topology, configurationRequest, this.ambariEventPublisher);
        this.executor.submit(new AsyncCallableService<Boolean>(task, task.getTimeout(), task.getRepeatDelay(), "ConfigureClusterTask", throwable -> {
            HostRoleStatus status = throwable instanceof TimeoutException ? HostRoleStatus.TIMEDOUT : HostRoleStatus.FAILED;
            LOG.info("ConfigureClusterTask failed, marking host requests {}", (Object)status);
            for (HostRequest hostRequest : logicalRequest.getHostRequests()) {
                hostRequest.markHostRequestFailed(status, (Throwable)throwable, this.persistedState);
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Subscribe
    public void processHostRemovedEvent(HostsRemovedEvent hostsRemovedEvent) {
        if (hostsRemovedEvent.getHostNames().isEmpty()) {
            LOG.warn("Missing host name from host removed event [{}] !", (Object)hostsRemovedEvent);
            return;
        }
        LOG.info("Removing hosts [{}] from available hosts on hosts removed event.", hostsRemovedEvent.getHostNames());
        HashSet<HostImpl> toBeRemoved = new HashSet<HostImpl>();
        List<HostImpl> list = this.availableHosts;
        synchronized (list) {
            block3: for (HostImpl hostImpl : this.availableHosts) {
                for (String hostName : hostsRemovedEvent.getHostNames()) {
                    if (!hostName.equals(hostImpl.getHostName())) continue;
                    toBeRemoved.add(hostImpl);
                    continue block3;
                }
            }
            if (!toBeRemoved.isEmpty()) {
                for (HostImpl host : toBeRemoved) {
                    this.availableHosts.remove(host);
                    LOG.info("Removed host: [{}] from available hosts", (Object)host.getHostName());
                }
            } else {
                LOG.debug("No any host [{}] found in available hosts", hostsRemovedEvent.getHostNames());
            }
        }
    }
}

