/*
 * Decompiled with CFR 0.152.
 */
package id.onyx.obdp.server.state.scheduler;

import com.google.inject.Inject;
import com.google.inject.name.Named;
import id.onyx.obdp.server.OBDPException;
import id.onyx.obdp.server.actionmanager.HostRoleStatus;
import id.onyx.obdp.server.scheduler.AbstractLinearExecutionJob;
import id.onyx.obdp.server.scheduler.ExecutionScheduleManager;
import id.onyx.obdp.server.state.scheduler.BatchRequestResponse;
import java.util.HashMap;
import java.util.Map;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class BatchRequestJob
extends AbstractLinearExecutionJob {
    private static final Logger LOG = LoggerFactory.getLogger(BatchRequestJob.class);
    public static final String BATCH_REQUEST_EXECUTION_ID_KEY = "BatchRequestJob.ExecutionId";
    public static final String BATCH_REQUEST_BATCH_ID_KEY = "BatchRequestJob.BatchId";
    public static final String BATCH_REQUEST_CLUSTER_NAME_KEY = "BatchRequestJob.ClusterName";
    public static final String BATCH_REQUEST_FAILED_TASKS_KEY = "BatchRequestJob.FailedTaskCount";
    public static final String BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY = "BatchRequestJob.FailedTaskInCurrentBatchCount";
    public static final String BATCH_REQUEST_TOTAL_TASKS_KEY = "BatchRequestJob.TotalTaskCount";
    private final long statusCheckInterval;

    @Inject
    public BatchRequestJob(ExecutionScheduleManager executionScheduleManager, @Named(value="statusCheckInterval") long statusCheckInterval) {
        super(executionScheduleManager);
        this.statusCheckInterval = statusCheckInterval;
    }

    @Override
    protected void doWork(Map<String, Object> properties) throws OBDPException {
        Long executionId = properties.get(BATCH_REQUEST_EXECUTION_ID_KEY) != null ? (Long)properties.get(BATCH_REQUEST_EXECUTION_ID_KEY) : null;
        Long batchId = properties.get(BATCH_REQUEST_BATCH_ID_KEY) != null ? (Long)properties.get(BATCH_REQUEST_BATCH_ID_KEY) : null;
        String clusterName = (String)properties.get(BATCH_REQUEST_CLUSTER_NAME_KEY);
        if (executionId == null || batchId == null) {
            throw new OBDPException("Unable to retrieve persisted batch request, execution_id = " + executionId + ", batch_id = " + batchId);
        }
        Map<String, Integer> taskCounts = this.getTaskCountProperties(properties);
        Long requestId = this.executionScheduleManager.executeBatchRequest(executionId, batchId, clusterName);
        if (requestId != null) {
            BatchRequestResponse batchRequestResponse;
            HostRoleStatus status;
            do {
                batchRequestResponse = this.executionScheduleManager.getBatchRequestResponse(requestId, clusterName);
                status = HostRoleStatus.valueOf(batchRequestResponse.getStatus());
                this.executionScheduleManager.updateBatchRequest(executionId, batchId, clusterName, batchRequestResponse, true);
                try {
                    Thread.sleep(this.statusCheckInterval);
                }
                catch (InterruptedException e) {
                    String message = "Job Thread interrupted";
                    LOG.error(message, (Throwable)e);
                    throw new OBDPException(message, (Throwable)e);
                }
            } while (!status.isCompletedState());
            Map<String, Integer> aggregateCounts = this.addTaskCountToProperties(properties, taskCounts, batchRequestResponse);
            if (this.executionScheduleManager.hasToleranceThresholdExceeded(executionId, clusterName, aggregateCounts)) {
                throw new OBDPException("Task failure tolerance limit exceeded, execution_id = " + executionId + ", processed batch_id = " + batchId + ", failed tasks in current batch = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY) + ", failed tasks total = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY) + ", total tasks completed = " + aggregateCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY));
            }
        }
    }

    @Override
    protected void finalizeExecution(Map<String, Object> properties) throws OBDPException {
        Long executionId = properties.get(BATCH_REQUEST_EXECUTION_ID_KEY) != null ? (Long)properties.get(BATCH_REQUEST_EXECUTION_ID_KEY) : null;
        Long batchId = properties.get(BATCH_REQUEST_BATCH_ID_KEY) != null ? (Long)properties.get(BATCH_REQUEST_BATCH_ID_KEY) : null;
        String clusterName = (String)properties.get(BATCH_REQUEST_CLUSTER_NAME_KEY);
        if (executionId == null || batchId == null) {
            throw new OBDPException("Unable to retrieve persisted batch request, execution_id = " + executionId + ", batch_id = " + batchId);
        }
        this.executionScheduleManager.finalizeBatch(executionId, clusterName);
    }

    private Map<String, Integer> addTaskCountToProperties(Map<String, Object> properties, Map<String, Integer> oldCounts, BatchRequestResponse batchRequestResponse) {
        HashMap<String, Integer> taskCounts = new HashMap<String, Integer>();
        if (batchRequestResponse != null) {
            Integer failedTasks = batchRequestResponse.getFailedTaskCount() + batchRequestResponse.getAbortedTaskCount() + batchRequestResponse.getTimedOutTaskCount();
            Integer failedCount = oldCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY) + failedTasks;
            Integer totalCount = oldCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY) + batchRequestResponse.getTotalTaskCount();
            taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
            taskCounts.put(BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY, batchRequestResponse.getFailedTaskCount());
            taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
            properties.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
            properties.put(BATCH_REQUEST_FAILED_TASKS_IN_CURRENT_BATCH_KEY, batchRequestResponse.getFailedTaskCount());
            properties.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
        }
        return taskCounts;
    }

    private Map<String, Integer> getTaskCountProperties(Map<String, Object> properties) {
        HashMap<String, Integer> taskCounts = new HashMap<String, Integer>();
        if (properties != null) {
            Object countObj = properties.get(BATCH_REQUEST_FAILED_TASKS_KEY);
            taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY, countObj != null ? Integer.parseInt(countObj.toString()) : 0);
            countObj = properties.get(BATCH_REQUEST_TOTAL_TASKS_KEY);
            taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, countObj != null ? Integer.parseInt(countObj.toString()) : 0);
        }
        return taskCounts;
    }
}

