/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexing.kafka.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.kafka.KafkaDataSourceMetadata;
import io.druid.indexing.kafka.KafkaIOConfig;
import io.druid.indexing.kafka.KafkaIndexTask;
import io.druid.indexing.kafka.KafkaIndexTaskClient;
import io.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import io.druid.indexing.kafka.KafkaPartitions;
import io.druid.indexing.kafka.KafkaTuningConfig;
import io.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import io.druid.indexing.kafka.supervisor.KafkaSupervisorReport;
import io.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import io.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import io.druid.indexing.kafka.supervisor.TaskReportData;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerListener;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.Supervisor;
import io.druid.indexing.overlord.supervisor.SupervisorReport;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.metadata.EntryExistsException;
import io.druid.server.metrics.DruidMonitorSchedulerConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.DateTime;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class KafkaSupervisor
implements Supervisor {
    private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
    private static final Random RANDOM = new Random();
    private static final long MAX_RUN_FREQUENCY_MILLIS = 1000L;
    private static final long NOT_SET = -1L;
    private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120L;
    private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000L;
    private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000L;
    private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000L;
    private static final CopyOnWriteArrayList EMPTY_LIST = Lists.newCopyOnWriteArrayList();
    public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
    private final ConcurrentHashMap<Integer, TaskGroup> taskGroups = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> pendingCompletionTaskGroups = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, Long>> partitionGroups = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, TaskGroup> sequenceTaskGroup = new ConcurrentHashMap();
    private final TaskStorage taskStorage;
    private final TaskMaster taskMaster;
    private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
    private final KafkaIndexTaskClient taskClient;
    private final ObjectMapper sortingMapper;
    private final KafkaSupervisorSpec spec;
    private final ServiceEmitter emitter;
    private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
    private final String dataSource;
    private final KafkaSupervisorIOConfig ioConfig;
    private final KafkaSupervisorTuningConfig tuningConfig;
    private final KafkaTuningConfig taskTuningConfig;
    private final String supervisorId;
    private final TaskInfoProvider taskInfoProvider;
    private final long futureTimeoutInSeconds;
    private final ExecutorService exec;
    private final ScheduledExecutorService scheduledExec;
    private final ScheduledExecutorService reportingExec;
    private final ListeningExecutorService workerExec;
    private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<Notice>();
    private final Object stopLock = new Object();
    private final Object stateChangeLock = new Object();
    private final Object consumerLock = new Object();
    private boolean listenerRegistered = false;
    private long lastRunTime;
    private volatile DateTime firstRunTime;
    private volatile KafkaConsumer consumer;
    private volatile boolean started = false;
    private volatile boolean stopped = false;
    private volatile Map<Integer, Long> latestOffsetsFromKafka;
    private volatile DateTime offsetsLastUpdated;

    public KafkaSupervisor(final TaskStorage taskStorage, final TaskMaster taskMaster, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, KafkaIndexTaskClientFactory taskClientFactory, ObjectMapper mapper, KafkaSupervisorSpec spec) {
        this.taskStorage = taskStorage;
        this.taskMaster = taskMaster;
        this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
        this.sortingMapper = mapper.copy().configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
        this.spec = spec;
        this.emitter = spec.getEmitter();
        this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
        this.dataSource = spec.getDataSchema().getDataSource();
        this.ioConfig = spec.getIoConfig();
        this.tuningConfig = spec.getTuningConfig();
        this.taskTuningConfig = KafkaTuningConfig.copyOf(this.tuningConfig);
        this.supervisorId = StringUtils.format((String)"KafkaSupervisor-%s", (Object[])new Object[]{this.dataSource});
        this.exec = Execs.singleThreaded((String)this.supervisorId);
        this.scheduledExec = Execs.scheduledSingleThreaded((String)(this.supervisorId + "-Scheduler-%d"));
        this.reportingExec = Execs.scheduledSingleThreaded((String)(this.supervisorId + "-Reporting-%d"));
        int workerThreads = this.tuningConfig.getWorkerThreads() != null ? this.tuningConfig.getWorkerThreads() : Math.min(10, this.ioConfig.getTaskCount());
        this.workerExec = MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)workerThreads, (String)(this.supervisorId + "-Worker-%d")));
        log.info("Created worker pool with [%d] threads for dataSource [%s]", new Object[]{workerThreads, this.dataSource});
        this.taskInfoProvider = new TaskInfoProvider(){

            public TaskLocation getTaskLocation(final String id) {
                Preconditions.checkNotNull((Object)id, (Object)"id");
                Optional taskRunner = taskMaster.getTaskRunner();
                if (taskRunner.isPresent()) {
                    Optional item = Iterables.tryFind((Iterable)((TaskRunner)taskRunner.get()).getRunningTasks(), (Predicate)new Predicate<TaskRunnerWorkItem>(){

                        public boolean apply(TaskRunnerWorkItem taskRunnerWorkItem) {
                            return id.equals(taskRunnerWorkItem.getTaskId());
                        }
                    });
                    if (item.isPresent()) {
                        return ((TaskRunnerWorkItem)item.get()).getLocation();
                    }
                } else {
                    log.error("Failed to get task runner because I'm not the leader!", new Object[0]);
                }
                return TaskLocation.unknown();
            }

            public Optional<TaskStatus> getTaskStatus(String id) {
                return taskStorage.getStatus(id);
            }
        };
        this.futureTimeoutInSeconds = Math.max(120L, this.tuningConfig.getChatRetries() * (this.tuningConfig.getHttpTimeout().getStandardSeconds() + 10L));
        int chatThreads = this.tuningConfig.getChatThreads() != null ? this.tuningConfig.getChatThreads() : Math.min(10, this.ioConfig.getTaskCount() * this.ioConfig.getReplicas());
        this.taskClient = taskClientFactory.build(this.taskInfoProvider, this.dataSource, chatThreads, this.tuningConfig.getHttpTimeout(), this.tuningConfig.getChatRetries());
        log.info("Created taskClient with dataSource[%s] chatThreads[%d] httpTimeout[%s] chatRetries[%d]", new Object[]{this.dataSource, chatThreads, this.tuningConfig.getHttpTimeout(), this.tuningConfig.getChatRetries()});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.stateChangeLock;
        synchronized (object) {
            Preconditions.checkState((!this.started ? 1 : 0) != 0, (Object)"already started");
            Preconditions.checkState((!this.exec.isShutdown() ? 1 : 0) != 0, (Object)"already stopped");
            try {
                this.consumer = this.getKafkaConsumer();
                this.exec.submit(new Runnable(){

                    @Override
                    public void run() {
                        block4: while (true) {
                            try {
                                while (!Thread.currentThread().isInterrupted()) {
                                    Notice notice = (Notice)KafkaSupervisor.this.notices.take();
                                    try {
                                        notice.handle();
                                        continue block4;
                                    }
                                    catch (Throwable e) {
                                        log.makeAlert(e, "KafkaSupervisor[%s] failed to handle notice", new Object[]{KafkaSupervisor.this.dataSource}).addData("noticeClass", (Object)notice.getClass().getSimpleName()).emit();
                                    }
                                }
                                break;
                            }
                            catch (InterruptedException e) {
                                log.info("KafkaSupervisor[%s] interrupted, exiting", new Object[]{KafkaSupervisor.this.dataSource});
                                break;
                            }
                        }
                    }
                });
                this.firstRunTime = DateTimes.nowUtc().plus((ReadableDuration)this.ioConfig.getStartDelay());
                this.scheduledExec.scheduleAtFixedRate(this.buildRunTask(), this.ioConfig.getStartDelay().getMillis(), Math.max(this.ioConfig.getPeriod().getMillis(), 1000L), TimeUnit.MILLISECONDS);
                this.reportingExec.scheduleAtFixedRate(this.updateCurrentAndLatestOffsets(), this.ioConfig.getStartDelay().getMillis() + 15000L, Math.max(this.tuningConfig.getOffsetFetchPeriod().getMillis(), 5000L), TimeUnit.MILLISECONDS);
                this.reportingExec.scheduleAtFixedRate(this.emitLag(), this.ioConfig.getStartDelay().getMillis() + 25000L, this.monitorSchedulerConfig.getEmitterPeriod().getMillis(), TimeUnit.MILLISECONDS);
                this.started = true;
                log.info("Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]", new Object[]{this.dataSource, this.ioConfig.getStartDelay(), this.spec.toString()});
            }
            catch (Exception e) {
                if (this.consumer != null) {
                    this.consumer.close();
                }
                log.makeAlert((Throwable)e, "Exception starting KafkaSupervisor[%s]", new Object[]{this.dataSource}).emit();
                throw Throwables.propagate((Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(boolean stopGracefully) {
        Object object = this.stateChangeLock;
        synchronized (object) {
            Preconditions.checkState((boolean)this.started, (Object)"not started");
            log.info("Beginning shutdown of KafkaSupervisor[%s]", new Object[]{this.dataSource});
            try {
                this.scheduledExec.shutdownNow();
                this.reportingExec.shutdownNow();
                Optional taskRunner = this.taskMaster.getTaskRunner();
                if (taskRunner.isPresent()) {
                    ((TaskRunner)taskRunner.get()).unregisterListener(this.supervisorId);
                }
                Object object2 = this.stopLock;
                synchronized (object2) {
                    if (stopGracefully) {
                        log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish", new Object[0]);
                        this.notices.add(new GracefulShutdownNotice());
                    } else {
                        log.info("Posting ShutdownNotice", new Object[0]);
                        this.notices.add(new ShutdownNotice());
                    }
                    long shutdownTimeoutMillis = this.tuningConfig.getShutdownTimeout().getMillis();
                    long endTime = System.currentTimeMillis() + shutdownTimeoutMillis;
                    while (!this.stopped) {
                        long sleepTime = endTime - System.currentTimeMillis();
                        if (sleepTime <= 0L) {
                            log.info("Timed out while waiting for shutdown (timeout [%,dms])", new Object[]{shutdownTimeoutMillis});
                            this.stopped = true;
                            break;
                        }
                        this.stopLock.wait(sleepTime);
                    }
                }
                log.info("Shutdown notice handled", new Object[0]);
                this.taskClient.close();
                this.workerExec.shutdownNow();
                this.exec.shutdownNow();
                this.started = false;
                log.info("KafkaSupervisor[%s] has stopped", new Object[]{this.dataSource});
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Exception stopping KafkaSupervisor[%s]", new Object[]{this.dataSource}).emit();
            }
        }
    }

    public SupervisorReport getStatus() {
        return this.generateReport(true);
    }

    public void reset(DataSourceMetadata dataSourceMetadata) {
        log.info("Posting ResetNotice", new Object[0]);
        this.notices.add(new ResetNotice(dataSourceMetadata));
    }

    public void checkpoint(String sequenceName, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint) {
        Preconditions.checkNotNull((Object)sequenceName, (Object)"Cannot checkpoint without a sequence name");
        Preconditions.checkNotNull((Object)currentCheckpoint, (Object)"current checkpoint cannot be null");
        Preconditions.checkArgument((boolean)this.ioConfig.getTopic().equals(((KafkaDataSourceMetadata)currentCheckpoint).getKafkaPartitions().getTopic()), (String)"Supervisor topic [%s] and topic in checkpoint [%s] does not match", (Object[])new Object[]{this.ioConfig.getTopic(), ((KafkaDataSourceMetadata)currentCheckpoint).getKafkaPartitions().getTopic()});
        log.info("Checkpointing [%s] for sequence [%s]", new Object[]{currentCheckpoint, sequenceName});
        this.notices.add(new CheckpointNotice(sequenceName, (KafkaDataSourceMetadata)previousCheckpoint, (KafkaDataSourceMetadata)currentCheckpoint));
    }

    public void possiblyRegisterListener() {
        if (this.listenerRegistered) {
            return;
        }
        Optional taskRunner = this.taskMaster.getTaskRunner();
        if (taskRunner.isPresent()) {
            ((TaskRunner)taskRunner.get()).registerListener(new TaskRunnerListener(){

                public String getListenerId() {
                    return KafkaSupervisor.this.supervisorId;
                }

                public void locationChanged(String taskId, TaskLocation newLocation) {
                }

                public void statusChanged(String taskId, TaskStatus status) {
                    KafkaSupervisor.this.notices.add(new RunNotice());
                }
            }, (Executor)MoreExecutors.sameThreadExecutor());
            this.listenerRegistered = true;
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @VisibleForTesting
    void resetInternal(DataSourceMetadata dataSourceMetadata) {
        if (dataSourceMetadata == null) {
            boolean result = this.indexerMetadataStorageCoordinator.deleteDataSourceMetadata(this.dataSource);
            log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", new Object[]{this.dataSource, result});
            this.taskGroups.values().forEach(this::killTasksInGroup);
            this.taskGroups.clear();
            this.partitionGroups.clear();
            this.sequenceTaskGroup.clear();
            return;
        }
        if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
            throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", new Object[]{dataSourceMetadata.getClass()});
        }
        KafkaDataSourceMetadata resetKafkaMetadata = (KafkaDataSourceMetadata)dataSourceMetadata;
        if (resetKafkaMetadata.getKafkaPartitions().getTopic().equals(this.ioConfig.getTopic())) {
            DataSourceMetadata metadata = this.indexerMetadataStorageCoordinator.getDataSourceMetadata(this.dataSource);
            if (metadata != null && !(metadata instanceof KafkaDataSourceMetadata)) {
                throw new IAE("Expected KafkaDataSourceMetadata from metadata store but found instance of [%s]", new Object[]{metadata.getClass()});
            }
            KafkaDataSourceMetadata currentMetadata = (KafkaDataSourceMetadata)metadata;
            boolean doReset = false;
            for (Map.Entry<Integer, Long> resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().entrySet()) {
                Long partitionOffsetInMetadataStore = currentMetadata == null ? null : currentMetadata.getKafkaPartitions().getPartitionOffsetMap().get(resetPartitionOffset.getKey());
                TaskGroup partitionTaskGroup = this.taskGroups.get(this.getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
                if (partitionOffsetInMetadataStore == null && (partitionTaskGroup == null || !((Long)partitionTaskGroup.partitionOffsets.get((Object)resetPartitionOffset.getKey())).equals(resetPartitionOffset.getValue()))) continue;
                doReset = true;
                break;
            }
            if (!doReset) {
                log.info("Ignoring duplicate reset request [%s]", new Object[]{dataSourceMetadata});
                return;
            }
            boolean metadataUpdateSuccess = false;
            if (currentMetadata == null) {
                metadataUpdateSuccess = true;
            } else {
                DataSourceMetadata newMetadata = currentMetadata.minus(resetKafkaMetadata);
                try {
                    metadataUpdateSuccess = this.indexerMetadataStorageCoordinator.resetDataSourceMetadata(this.dataSource, newMetadata);
                }
                catch (IOException e) {
                    log.error("Resetting DataSourceMetadata failed [%s]", new Object[]{e.getMessage()});
                    Throwables.propagate((Throwable)e);
                }
            }
            if (!metadataUpdateSuccess) throw new ISE("Unable to reset metadata", new Object[0]);
            resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> {
                int groupId = this.getTaskGroupIdForPartition((int)partition);
                this.killTaskGroupForPartitions((Set<Integer>)ImmutableSet.of((Object)partition));
                this.sequenceTaskGroup.remove(this.generateSequenceName(groupId));
                this.taskGroups.remove(groupId);
                this.partitionGroups.get(groupId).replaceAll((partitionId, offset) -> -1L);
            });
            return;
        }
        log.warn("Reset metadata topic [%s] and supervisor's topic [%s] do not match", new Object[]{resetKafkaMetadata.getKafkaPartitions().getTopic(), this.ioConfig.getTopic()});
    }

    private void killTaskGroupForPartitions(Set<Integer> partitions) {
        for (Integer partition : partitions) {
            this.killTasksInGroup(this.taskGroups.get(this.getTaskGroupIdForPartition(partition)));
        }
    }

    private void killTasksInGroup(TaskGroup taskGroup) {
        if (taskGroup != null) {
            for (String taskId : taskGroup.tasks.keySet()) {
                log.info("Killing task [%s] in the task group", new Object[]{taskId});
                this.killTask(taskId);
            }
        }
    }

    @VisibleForTesting
    void gracefulShutdownInternal() throws ExecutionException, InterruptedException, TimeoutException {
        for (TaskGroup taskGroup : this.taskGroups.values()) {
            for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                if (this.taskInfoProvider.getTaskLocation(entry.getKey()).equals((Object)TaskLocation.unknown())) {
                    this.killTask(entry.getKey());
                    continue;
                }
                entry.getValue().startTime = DateTimes.EPOCH;
            }
        }
        this.checkTaskDuration();
    }

    @VisibleForTesting
    void runInternal() throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException {
        this.possiblyRegisterListener();
        this.updatePartitionDataFromKafka();
        this.discoverTasks();
        this.updateTaskStatus();
        this.checkTaskDuration();
        this.checkPendingCompletionTasks();
        this.checkCurrentTaskState();
        this.createNewTasks();
        if (log.isDebugEnabled()) {
            log.debug(this.generateReport(true).toString(), new Object[0]);
        } else {
            log.info(this.generateReport(false).toString(), new Object[0]);
        }
    }

    String generateSequenceName(Map<Integer, Long> startPartitions, Optional<DateTime> minimumMessageTime, Optional<DateTime> maximumMessageTime) {
        String tuningConfig;
        String dataSchema;
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<Integer, Long> entry : startPartitions.entrySet()) {
            sb.append(StringUtils.format((String)"+%d(%d)", (Object[])new Object[]{entry.getKey(), entry.getValue()}));
        }
        String partitionOffsetStr = sb.toString().substring(1);
        String minMsgTimeStr = minimumMessageTime.isPresent() ? String.valueOf(((DateTime)minimumMessageTime.get()).getMillis()) : "";
        String maxMsgTimeStr = maximumMessageTime.isPresent() ? String.valueOf(((DateTime)maximumMessageTime.get()).getMillis()) : "";
        try {
            dataSchema = this.sortingMapper.writeValueAsString((Object)this.spec.getDataSchema());
            tuningConfig = this.sortingMapper.writeValueAsString((Object)this.taskTuningConfig);
        }
        catch (JsonProcessingException e) {
            throw Throwables.propagate((Throwable)e);
        }
        String hashCode = DigestUtils.sha1Hex((String)(dataSchema + tuningConfig + partitionOffsetStr + minMsgTimeStr + maxMsgTimeStr)).substring(0, 15);
        return Joiner.on((String)"_").join((Object)"index_kafka", (Object)this.dataSource, new Object[]{hashCode});
    }

    @VisibleForTesting
    String generateSequenceName(int groupId) {
        return this.generateSequenceName((Map<Integer, Long>)this.taskGroups.get((Object)Integer.valueOf((int)groupId)).partitionOffsets, this.taskGroups.get((Object)Integer.valueOf((int)groupId)).minimumMessageTime, this.taskGroups.get((Object)Integer.valueOf((int)groupId)).maximumMessageTime);
    }

    private static String getRandomId() {
        StringBuilder suffix = new StringBuilder(8);
        for (int i = 0; i < 8; ++i) {
            suffix.append((char)(97 + (RANDOM.nextInt() >>> i * 4 & 0xF)));
        }
        return suffix.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
        Properties props = new Properties();
        props.setProperty("metadata.max.age.ms", "10000");
        props.setProperty("group.id", StringUtils.format((String)"kafka-supervisor-%s", (Object[])new Object[]{KafkaSupervisor.getRandomId()}));
        props.putAll(this.ioConfig.getConsumerProperties());
        props.setProperty("enable.auto.commit", "false");
        ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
            KafkaConsumer kafkaConsumer = new KafkaConsumer(props, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
            return kafkaConsumer;
        }
        finally {
            Thread.currentThread().setContextClassLoader(currCtxCl);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updatePartitionDataFromKafka() {
        Map topics;
        try {
            Object object = this.consumerLock;
            synchronized (object) {
                topics = this.consumer.listTopics();
            }
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Unable to get partition data from Kafka for brokers [%s], are the brokers up?", new Object[]{this.ioConfig.getConsumerProperties().get("bootstrap.servers")});
            return;
        }
        List partitions = (List)topics.get(this.ioConfig.getTopic());
        if (partitions == null) {
            log.warn("No such topic [%s] found, list of discovered topics [%s]", new Object[]{this.ioConfig.getTopic(), topics.keySet()});
        }
        int numPartitions = partitions != null ? partitions.size() : 0;
        log.debug("Found [%d] Kafka partitions for topic [%s]", new Object[]{numPartitions, this.ioConfig.getTopic()});
        for (int partition = 0; partition < numPartitions; ++partition) {
            int taskGroupId = this.getTaskGroupIdForPartition(partition);
            this.partitionGroups.putIfAbsent(taskGroupId, new ConcurrentHashMap());
            ConcurrentHashMap<Integer, Long> partitionMap = this.partitionGroups.get(taskGroupId);
            if (partitionMap.putIfAbsent(partition, -1L) != null) continue;
            log.info("New partition [%d] discovered for topic [%s], added to task group [%d]", new Object[]{partition, this.ioConfig.getTopic(), taskGroupId});
        }
    }

    private void discoverTasks() throws ExecutionException, InterruptedException, TimeoutException {
        int taskCount = 0;
        ArrayList futureTaskIds = Lists.newArrayList();
        ArrayList futures = Lists.newArrayList();
        List tasks = this.taskStorage.getActiveTasks();
        final HashSet taskGroupsToVerify = new HashSet();
        for (Task task : tasks) {
            if (!(task instanceof KafkaIndexTask) || !this.dataSource.equals(task.getDataSource())) continue;
            ++taskCount;
            final KafkaIndexTask kafkaTask = (KafkaIndexTask)task;
            final String taskId = task.getId();
            Iterator<Integer> it = kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet().iterator();
            final Integer taskGroupId = it.hasNext() ? Integer.valueOf(this.getTaskGroupIdForPartition(it.next())) : null;
            if (taskGroupId == null) continue;
            TaskGroup taskGroup = this.taskGroups.get(taskGroupId);
            if (this.isTaskInPendingCompletionGroups(taskId) || taskGroup != null && taskGroup.tasks.containsKey(taskId)) continue;
            futureTaskIds.add(taskId);
            futures.add(Futures.transform(this.taskClient.getStatusAsync(taskId), (Function)new Function<KafkaIndexTask.Status, Boolean>(){

                public Boolean apply(KafkaIndexTask.Status status) {
                    try {
                        log.debug("Task [%s], status [%s]", new Object[]{taskId, status});
                        if (status == KafkaIndexTask.Status.PUBLISHING) {
                            kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet().forEach(partition -> KafkaSupervisor.this.addDiscoveredTaskToPendingCompletionTaskGroups(KafkaSupervisor.this.getTaskGroupIdForPartition((int)partition), taskId, kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()));
                            Map<Integer, Long> publishingTaskEndOffsets = KafkaSupervisor.this.taskClient.getEndOffsets(taskId);
                            for (Map.Entry<Integer, Long> entry : publishingTaskEndOffsets.entrySet()) {
                                boolean succeeded;
                                Integer partition2 = entry.getKey();
                                Long offset = entry.getValue();
                                ConcurrentHashMap partitionOffsets = (ConcurrentHashMap)KafkaSupervisor.this.partitionGroups.get(KafkaSupervisor.this.getTaskGroupIdForPartition(partition2));
                                do {
                                    succeeded = true;
                                    Long previousOffset = partitionOffsets.putIfAbsent(partition2, offset);
                                    if (previousOffset == null || previousOffset >= offset) continue;
                                    succeeded = partitionOffsets.replace(partition2, previousOffset, offset);
                                } while (!succeeded);
                            }
                        } else {
                            for (Integer partition3 : kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet()) {
                                if (taskGroupId.equals(KafkaSupervisor.this.getTaskGroupIdForPartition(partition3))) continue;
                                log.warn("Stopping task [%s] which does not match the expected partition allocation", new Object[]{taskId});
                                try {
                                    KafkaSupervisor.this.stopTask(taskId, false).get(KafkaSupervisor.this.futureTimeoutInSeconds, TimeUnit.SECONDS);
                                }
                                catch (InterruptedException | ExecutionException | TimeoutException e) {
                                    log.warn((Throwable)e, "Exception while stopping task", new Object[0]);
                                }
                                return false;
                            }
                            if (!KafkaSupervisor.this.isTaskCurrent(taskGroupId, taskId)) {
                                log.info("Stopping task [%s] which does not match the expected parameters and ingestion spec", new Object[]{taskId});
                                try {
                                    KafkaSupervisor.this.stopTask(taskId, false).get(KafkaSupervisor.this.futureTimeoutInSeconds, TimeUnit.SECONDS);
                                }
                                catch (InterruptedException | ExecutionException | TimeoutException e) {
                                    log.warn((Throwable)e, "Exception while stopping task", new Object[0]);
                                }
                                return false;
                            }
                            if (KafkaSupervisor.this.taskGroups.putIfAbsent(taskGroupId, new TaskGroup((ImmutableMap<Integer, Long>)ImmutableMap.copyOf(kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()), kafkaTask.getIOConfig().getMinimumMessageTime(), kafkaTask.getIOConfig().getMaximumMessageTime())) == null) {
                                KafkaSupervisor.this.sequenceTaskGroup.put(KafkaSupervisor.this.generateSequenceName(taskGroupId), KafkaSupervisor.this.taskGroups.get(taskGroupId));
                                log.info("Created new task group [%d]", new Object[]{taskGroupId});
                            }
                            taskGroupsToVerify.add(taskGroupId);
                            ((TaskGroup)((KafkaSupervisor)KafkaSupervisor.this).taskGroups.get((Object)taskGroupId)).tasks.putIfAbsent(taskId, new TaskData());
                        }
                        return true;
                    }
                    catch (Throwable t) {
                        log.error(t, "Something bad while discovering task [%s]", new Object[]{taskId});
                        return null;
                    }
                }
            }, (Executor)this.workerExec));
        }
        List results = (List)Futures.successfulAsList((Iterable)futures).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        for (int i = 0; i < results.size(); ++i) {
            if (results.get(i) != null) continue;
            String taskId = (String)futureTaskIds.get(i);
            log.warn("Task [%s] failed to return status, killing task", new Object[]{taskId});
            this.killTask(taskId);
        }
        log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", new Object[]{taskCount, this.dataSource});
        taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints);
    }

    private void verifyAndMergeCheckpoints(Integer groupId) {
        final TaskGroup taskGroup = this.taskGroups.get(groupId);
        final CopyOnWriteArrayList taskSequences = new CopyOnWriteArrayList();
        ArrayList<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures = new ArrayList<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>>();
        for (final String taskId : taskGroup.taskIds()) {
            ListenableFuture<TreeMap<Integer, Map<Integer, Long>>> checkpointsFuture = this.taskClient.getCheckpointsAsync(taskId, true);
            futures.add(checkpointsFuture);
            Futures.addCallback(checkpointsFuture, (FutureCallback)new FutureCallback<TreeMap<Integer, Map<Integer, Long>>>(){

                public void onSuccess(TreeMap<Integer, Map<Integer, Long>> checkpoints) {
                    if (!checkpoints.isEmpty()) {
                        taskSequences.add(new Pair((Object)taskId, checkpoints));
                    } else {
                        log.warn("Ignoring task [%s], as probably it is not started running yet", new Object[]{taskId});
                    }
                }

                public void onFailure(Throwable t) {
                    log.error(t, "Problem while getting checkpoints for task [%s], killing the task", new Object[]{taskId});
                    KafkaSupervisor.this.killTask(taskId);
                    taskGroup.tasks.remove(taskId);
                }
            });
        }
        try {
            Futures.allAsList(futures).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            Throwables.propagate((Throwable)e);
        }
        KafkaDataSourceMetadata latestDataSourceMetadata = (KafkaDataSourceMetadata)this.indexerMetadataStorageCoordinator.getDataSourceMetadata(this.dataSource);
        Map<Integer, Long> latestOffsetsFromDb = latestDataSourceMetadata == null || latestDataSourceMetadata.getKafkaPartitions() == null ? null : latestDataSourceMetadata.getKafkaPartitions().getPartitionOffsetMap();
        taskSequences.sort((o1, o2) -> ((Integer)((TreeMap)o2.rhs).firstKey()).compareTo((Integer)((TreeMap)o1.rhs).firstKey()));
        HashSet<Object> tasksToKill = new HashSet<Object>();
        AtomicInteger earliestConsistentSequenceId = new AtomicInteger(-1);
        for (int taskIndex = 0; taskIndex < taskSequences.size(); ++taskIndex) {
            if (earliestConsistentSequenceId.get() == -1) {
                if (((TreeMap)((Pair)taskSequences.get((int)taskIndex)).rhs).entrySet().stream().anyMatch(sequenceCheckpoint -> ((Map)sequenceCheckpoint.getValue()).entrySet().stream().allMatch(partitionOffset -> Longs.compare((long)((Long)partitionOffset.getValue()), (long)(latestOffsetsFromDb == null ? ((Long)partitionOffset.getValue()).longValue() : ((Long)latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), (Long)partitionOffset.getValue())).longValue())) == 0) && earliestConsistentSequenceId.compareAndSet(-1, (Integer)sequenceCheckpoint.getKey())) || this.pendingCompletionTaskGroups.getOrDefault(groupId, EMPTY_LIST).size() > 0 && earliestConsistentSequenceId.compareAndSet(-1, (Integer)((TreeMap)((Pair)taskSequences.get((int)taskIndex)).rhs).firstKey())) {
                    TreeMap latestCheckpoints = new TreeMap(((TreeMap)((Pair)taskSequences.get((int)taskIndex)).rhs).tailMap(earliestConsistentSequenceId.get()));
                    log.info("Setting taskGroup sequences to [%s] for group [%d]", new Object[]{latestCheckpoints, groupId});
                    taskGroup.sequenceOffsets.clear();
                    taskGroup.sequenceOffsets.putAll(latestCheckpoints);
                    continue;
                }
                log.debug("Adding task [%s] to kill list, checkpoints[%s], latestoffsets from DB [%s]", new Object[]{((Pair)taskSequences.get((int)taskIndex)).lhs, ((Pair)taskSequences.get((int)taskIndex)).rhs, latestOffsetsFromDb});
                tasksToKill.add(((Pair)taskSequences.get((int)taskIndex)).lhs);
                continue;
            }
            if (((TreeMap)((Pair)taskSequences.get((int)taskIndex)).rhs).get(taskGroup.sequenceOffsets.firstKey()) != null && ((Map)((TreeMap)((Pair)taskSequences.get((int)taskIndex)).rhs).get(taskGroup.sequenceOffsets.firstKey())).equals(taskGroup.sequenceOffsets.firstEntry().getValue()) && ((TreeMap)((Pair)taskSequences.get((int)taskIndex)).rhs).tailMap(taskGroup.sequenceOffsets.firstKey()).size() == taskGroup.sequenceOffsets.size()) continue;
            log.debug("Adding task [%s] to kill list, checkpoints[%s], taskgroup checkpoints [%s]", new Object[]{((Pair)taskSequences.get((int)taskIndex)).lhs, ((Pair)taskSequences.get((int)taskIndex)).rhs, taskGroup.sequenceOffsets});
            tasksToKill.add(((Pair)taskSequences.get((int)taskIndex)).lhs);
        }
        if (tasksToKill.size() > 0 && tasksToKill.size() == taskGroup.tasks.size() || taskGroup.tasks.size() == 0 && this.pendingCompletionTaskGroups.getOrDefault(groupId, EMPTY_LIST).size() == 0) {
            log.warn("Clearing task group [%d] information as no valid tasks left the group", new Object[]{groupId});
            this.sequenceTaskGroup.remove(this.generateSequenceName(groupId));
            this.taskGroups.remove(groupId);
            this.partitionGroups.get(groupId).replaceAll((partition, offset) -> -1L);
        }
        taskSequences.stream().filter(taskIdSequences -> tasksToKill.contains(taskIdSequences.lhs)).forEach(sequenceCheckpoint -> {
            log.warn("Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest persisted offsets in metadata store [%s]", new Object[]{sequenceCheckpoint.lhs, sequenceCheckpoint.rhs, taskGroup.sequenceOffsets, latestOffsetsFromDb});
            this.killTask((String)sequenceCheckpoint.lhs);
            taskGroup.tasks.remove(sequenceCheckpoint.lhs);
        });
    }

    private void addDiscoveredTaskToPendingCompletionTaskGroups(int groupId, String taskId, Map<Integer, Long> startingPartitions) {
        this.pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.newCopyOnWriteArrayList());
        CopyOnWriteArrayList<TaskGroup> taskGroupList = this.pendingCompletionTaskGroups.get(groupId);
        for (TaskGroup taskGroup : taskGroupList) {
            if (!taskGroup.partitionOffsets.equals(startingPartitions)) continue;
            if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
                log.info("Added discovered task [%s] to existing pending task group [%s]", new Object[]{taskId, groupId});
            }
            return;
        }
        log.info("Creating new pending completion task group [%s] for discovered task [%s]", new Object[]{groupId, taskId});
        TaskGroup newTaskGroup = new TaskGroup((ImmutableMap<Integer, Long>)ImmutableMap.copyOf(startingPartitions), (Optional<DateTime>)Optional.absent(), (Optional<DateTime>)Optional.absent());
        newTaskGroup.tasks.put(taskId, new TaskData());
        newTaskGroup.completionTimeout = DateTimes.nowUtc().plus((ReadableDuration)this.ioConfig.getCompletionTimeout());
        taskGroupList.add(newTaskGroup);
    }

    /*
     * WARNING - void declaration
     */
    private void updateTaskStatus() throws ExecutionException, InterruptedException, TimeoutException {
        void var4_8;
        ArrayList futures = Lists.newArrayList();
        ArrayList futureTaskIds = Lists.newArrayList();
        for (TaskGroup taskGroup : this.taskGroups.values()) {
            for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                String taskId = entry.getKey();
                final TaskData taskData = entry.getValue();
                if (taskData.startTime == null) {
                    futureTaskIds.add(taskId);
                    futures.add(Futures.transform(this.taskClient.getStartTimeAsync(taskId), (Function)new Function<DateTime, Boolean>(){

                        @Nullable
                        public Boolean apply(@Nullable DateTime startTime) {
                            if (startTime == null) {
                                return false;
                            }
                            taskData.startTime = startTime;
                            long millisRemaining = KafkaSupervisor.this.ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() - taskData.startTime.getMillis());
                            if (millisRemaining > 0L) {
                                KafkaSupervisor.this.scheduledExec.schedule(KafkaSupervisor.this.buildRunTask(), millisRemaining + 1000L, TimeUnit.MILLISECONDS);
                            }
                            return true;
                        }
                    }, (Executor)this.workerExec));
                }
                taskData.status = (TaskStatus)this.taskStorage.getStatus(taskId).get();
            }
        }
        for (List list : this.pendingCompletionTaskGroups.values()) {
            for (TaskGroup group : list) {
                for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
                    entry.getValue().status = (TaskStatus)this.taskStorage.getStatus(entry.getKey()).get();
                }
            }
        }
        List results = (List)Futures.successfulAsList((Iterable)futures).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        boolean bl = false;
        while (var4_8 < results.size()) {
            if (results.get((int)var4_8) == null) {
                String taskId = (String)futureTaskIds.get((int)var4_8);
                log.warn("Task [%s] failed to return start time, killing task", new Object[]{taskId});
                this.killTask(taskId);
            }
            ++var4_8;
        }
    }

    private void checkTaskDuration() throws InterruptedException, ExecutionException, TimeoutException {
        TaskGroup group;
        Integer groupId;
        ArrayList futures = Lists.newArrayList();
        ArrayList futureGroupIds = Lists.newArrayList();
        for (Map.Entry<Integer, TaskGroup> entry : this.taskGroups.entrySet()) {
            groupId = entry.getKey();
            group = entry.getValue();
            DateTime earliestTaskStart = DateTimes.nowUtc();
            for (TaskData taskData : group.tasks.values()) {
                if (!earliestTaskStart.isAfter((ReadableInstant)taskData.startTime)) continue;
                earliestTaskStart = taskData.startTime;
            }
            if (!earliestTaskStart.plus((ReadableDuration)this.ioConfig.getTaskDuration()).isBeforeNow()) continue;
            log.info("Task group [%d] has run for [%s]", new Object[]{groupId, this.ioConfig.getTaskDuration()});
            futureGroupIds.add(groupId);
            futures.add(this.checkpointTaskGroup(groupId, true));
        }
        List results = (List)Futures.successfulAsList((Iterable)futures).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        for (int j = 0; j < results.size(); ++j) {
            groupId = (Integer)futureGroupIds.get(j);
            group = this.taskGroups.get(groupId);
            Map endOffsets = (Map)results.get(j);
            if (endOffsets != null) {
                group.completionTimeout = DateTimes.nowUtc().plus((ReadableDuration)this.ioConfig.getCompletionTimeout());
                this.pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.newCopyOnWriteArrayList());
                this.pendingCompletionTaskGroups.get(groupId).add(group);
                for (Map.Entry entry : endOffsets.entrySet()) {
                    this.partitionGroups.get(groupId).put((Integer)entry.getKey(), (Long)entry.getValue());
                }
            } else {
                log.warn("All tasks in group [%s] failed to transition to publishing state, killing tasks [%s]", new Object[]{groupId, group.taskIds()});
                for (String string : group.taskIds()) {
                    this.killTask(string);
                }
                this.partitionGroups.get(groupId).replaceAll((partition, offset) -> -1L);
            }
            this.sequenceTaskGroup.remove(this.generateSequenceName(groupId));
            this.taskGroups.remove(groupId);
        }
    }

    private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(int groupId, boolean finalize) {
        TaskGroup taskGroup = this.taskGroups.get(groupId);
        if (finalize) {
            Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
            while (i.hasNext()) {
                Map.Entry<String, TaskData> taskEntry = i.next();
                String taskId = taskEntry.getKey();
                TaskData task = taskEntry.getValue();
                if (task.status.isSuccess()) {
                    return Futures.transform(this.stopTasksInGroup(taskGroup), (Function)new Function<Object, Map<Integer, Long>>(){

                        @Nullable
                        public Map<Integer, Long> apply(@Nullable Object input) {
                            return null;
                        }
                    });
                }
                if (!task.status.isRunnable() || !this.taskInfoProvider.getTaskLocation(taskId).equals((Object)TaskLocation.unknown())) continue;
                log.info("Killing task [%s] which hasn't been assigned to a worker", new Object[]{taskId});
                this.killTask(taskId);
                i.remove();
            }
        }
        ArrayList pauseFutures = Lists.newArrayList();
        ImmutableList pauseTaskIds = ImmutableList.copyOf(taskGroup.taskIds());
        for (String taskId : pauseTaskIds) {
            pauseFutures.add(this.taskClient.pauseAsync(taskId));
        }
        return Futures.transform((ListenableFuture)Futures.successfulAsList((Iterable)pauseFutures), (Function)new Function<List<Map<Integer, Long>>, Map<Integer, Long>>((List)pauseTaskIds, taskGroup, groupId, finalize){
            final /* synthetic */ List val$pauseTaskIds;
            final /* synthetic */ TaskGroup val$taskGroup;
            final /* synthetic */ int val$groupId;
            final /* synthetic */ boolean val$finalize;
            {
                this.val$pauseTaskIds = list;
                this.val$taskGroup = taskGroup;
                this.val$groupId = n;
                this.val$finalize = bl;
            }

            /*
             * WARNING - void declaration
             */
            @Nullable
            public Map<Integer, Long> apply(List<Map<Integer, Long>> input) {
                HashMap<Integer, Long> endOffsets = new HashMap<Integer, Long>();
                for (int i = 0; i < input.size(); ++i) {
                    Map<Integer, Long> result = input.get(i);
                    if (result == null || result.isEmpty()) {
                        Iterator taskId = (String)this.val$pauseTaskIds.get(i);
                        log.warn("Task [%s] failed to respond to [pause] in a timely manner, killing task", new Object[]{taskId});
                        KafkaSupervisor.this.killTask((String)((Object)taskId));
                        this.val$taskGroup.tasks.remove(taskId);
                        continue;
                    }
                    for (Map.Entry entry : result.entrySet()) {
                        if (endOffsets.containsKey(entry.getKey()) && ((Long)endOffsets.get(entry.getKey())).compareTo((Long)entry.getValue()) >= 0) continue;
                        endOffsets.put((Integer)entry.getKey(), (Long)entry.getValue());
                    }
                }
                ArrayList setEndOffsetFutures = Lists.newArrayList();
                ImmutableList setEndOffsetTaskIds = ImmutableList.copyOf(this.val$taskGroup.taskIds());
                if (setEndOffsetTaskIds.isEmpty()) {
                    log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", new Object[]{this.val$groupId});
                    return null;
                }
                try {
                    void var6_12;
                    if (endOffsets.equals(this.val$taskGroup.sequenceOffsets.lastEntry().getValue())) {
                        log.warn("Not adding checkpoint [%s] as its same as the start offsets [%s] of latest sequence for the task group [%d]", new Object[]{endOffsets, this.val$taskGroup.sequenceOffsets.lastEntry().getValue(), this.val$groupId});
                        return endOffsets;
                    }
                    log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", new Object[]{this.val$groupId, endOffsets});
                    for (String string : setEndOffsetTaskIds) {
                        setEndOffsetFutures.add(KafkaSupervisor.this.taskClient.setEndOffsetsAsync(string, endOffsets, true, this.val$finalize));
                    }
                    List results = (List)Futures.successfulAsList((Iterable)setEndOffsetFutures).get(KafkaSupervisor.this.futureTimeoutInSeconds, TimeUnit.SECONDS);
                    boolean bl = false;
                    while (var6_12 < results.size()) {
                        if (results.get((int)var6_12) == null || !((Boolean)results.get((int)var6_12)).booleanValue()) {
                            String taskId = (String)setEndOffsetTaskIds.get((int)var6_12);
                            log.warn("Task [%s] failed to respond to [set end offsets] in a timely manner, killing task", new Object[]{taskId});
                            KafkaSupervisor.this.killTask(taskId);
                            this.val$taskGroup.tasks.remove(taskId);
                        }
                        ++var6_12;
                    }
                }
                catch (Exception e) {
                    log.error("Something bad happened [%s]", new Object[]{e.getMessage()});
                    Throwables.propagate((Throwable)e);
                }
                if (this.val$taskGroup.tasks.isEmpty()) {
                    log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", new Object[]{this.val$groupId});
                    return null;
                }
                return endOffsets;
            }
        }, (Executor)this.workerExec);
    }

    private void checkPendingCompletionTasks() throws ExecutionException, InterruptedException, TimeoutException {
        ArrayList futures = Lists.newArrayList();
        for (Map.Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : this.pendingCompletionTaskGroups.entrySet()) {
            boolean stopTasksInTaskGroup = false;
            Integer groupId = pendingGroupList.getKey();
            CopyOnWriteArrayList<TaskGroup> taskGroupList = pendingGroupList.getValue();
            ArrayList toRemove = Lists.newArrayList();
            for (TaskGroup group : taskGroupList) {
                boolean foundSuccess = false;
                boolean entireTaskGroupFailed = false;
                if (stopTasksInTaskGroup) {
                    futures.add(this.stopTasksInGroup(group));
                    toRemove.add(group);
                    continue;
                }
                Iterator<Map.Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
                while (iTask.hasNext()) {
                    Map.Entry<String, TaskData> task = iTask.next();
                    if (task.getValue().status.isFailure()) {
                        iTask.remove();
                        if (group.tasks.isEmpty()) {
                            entireTaskGroupFailed = true;
                            break;
                        }
                    }
                    if (!task.getValue().status.isSuccess()) continue;
                    log.info("Task [%s] completed successfully, stopping tasks %s", new Object[]{task.getKey(), group.taskIds()});
                    futures.add(this.stopTasksInGroup(group));
                    foundSuccess = true;
                    toRemove.add(group);
                    break;
                }
                if ((foundSuccess || !group.completionTimeout.isBeforeNow()) && !entireTaskGroupFailed) continue;
                if (entireTaskGroupFailed) {
                    log.warn("All tasks in group [%d] failed to publish, killing all tasks for these partitions", new Object[]{groupId});
                } else {
                    log.makeAlert("No task in [%s] succeeded before the completion timeout elapsed [%s]!", new Object[]{group.taskIds(), this.ioConfig.getCompletionTimeout()}).emit();
                }
                this.partitionGroups.get(groupId).replaceAll((partition, offset) -> -1L);
                this.sequenceTaskGroup.remove(this.generateSequenceName(groupId));
                this.killTasksInGroup(group);
                stopTasksInTaskGroup = true;
                this.killTasksInGroup(this.taskGroups.remove(groupId));
                toRemove.add(group);
            }
            taskGroupList.removeAll(toRemove);
        }
        Futures.successfulAsList((Iterable)futures).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
    }

    private void checkCurrentTaskState() throws ExecutionException, InterruptedException, TimeoutException {
        ArrayList futures = Lists.newArrayList();
        Iterator<Map.Entry<Integer, TaskGroup>> iTaskGroups = this.taskGroups.entrySet().iterator();
        while (iTaskGroups.hasNext()) {
            Map.Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
            Integer groupId = taskGroupEntry.getKey();
            TaskGroup taskGroup = taskGroupEntry.getValue();
            log.debug("Task group [%d] pre-pruning: %s", new Object[]{groupId, taskGroup.taskIds()});
            Iterator<Map.Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator();
            while (iTasks.hasNext()) {
                Map.Entry<String, TaskData> task = iTasks.next();
                String taskId = task.getKey();
                TaskData taskData = task.getValue();
                if (!this.isTaskCurrent(groupId, taskId)) {
                    log.info("Stopping task [%s] which does not match the expected offset range and ingestion spec", new Object[]{taskId});
                    futures.add(this.stopTask(taskId, false));
                    iTasks.remove();
                    continue;
                }
                if (taskData.status.isFailure()) {
                    iTasks.remove();
                    continue;
                }
                if (!taskData.status.isSuccess()) continue;
                futures.add(this.stopTasksInGroup(taskGroup));
                this.sequenceTaskGroup.remove(this.generateSequenceName(groupId));
                iTaskGroups.remove();
                break;
            }
            log.debug("Task group [%d] post-pruning: %s", new Object[]{groupId, taskGroup.taskIds()});
        }
        Futures.successfulAsList((Iterable)futures).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
    }

    void createNewTasks() throws JsonProcessingException {
        this.taskGroups.entrySet().stream().filter(taskGroup -> ((TaskGroup)taskGroup.getValue()).tasks.size() < this.ioConfig.getReplicas()).forEach(taskGroup -> this.verifyAndMergeCheckpoints((Integer)taskGroup.getKey()));
        for (Integer groupId : this.partitionGroups.keySet()) {
            if (this.taskGroups.containsKey(groupId)) continue;
            log.info("Creating new task group [%d] for partitions %s", new Object[]{groupId, this.partitionGroups.get(groupId).keySet()});
            Optional minimumMessageTime = this.ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of((Object)DateTimes.nowUtc().minus((ReadableDuration)this.ioConfig.getLateMessageRejectionPeriod().get())) : Optional.absent();
            Optional maximumMessageTime = this.ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of((Object)DateTimes.nowUtc().plus((ReadableDuration)this.ioConfig.getTaskDuration()).plus((ReadableDuration)this.ioConfig.getEarlyMessageRejectionPeriod().get())) : Optional.absent();
            this.taskGroups.put(groupId, new TaskGroup(this.generateStartingOffsetsForPartitionGroup(groupId), (Optional<DateTime>)minimumMessageTime, (Optional<DateTime>)maximumMessageTime));
            this.sequenceTaskGroup.put(this.generateSequenceName(groupId), this.taskGroups.get(groupId));
        }
        boolean createdTask = false;
        for (Map.Entry<Integer, TaskGroup> entry : this.taskGroups.entrySet()) {
            TaskGroup taskGroup2 = entry.getValue();
            Integer groupId = entry.getKey();
            if (this.ioConfig.getReplicas() <= taskGroup2.tasks.size()) continue;
            log.info("Number of tasks [%d] does not match configured numReplicas [%d] in task group [%d], creating more tasks", new Object[]{taskGroup2.tasks.size(), this.ioConfig.getReplicas(), groupId});
            this.createKafkaTasksForGroup(groupId, this.ioConfig.getReplicas() - taskGroup2.tasks.size());
            createdTask = true;
        }
        if (createdTask && this.firstRunTime.isBeforeNow()) {
            this.scheduledExec.schedule(this.buildRunTask(), 5000L, TimeUnit.MILLISECONDS);
        }
    }

    private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProcessingException {
        ImmutableMap<Integer, Long> startPartitions = this.taskGroups.get((Object)Integer.valueOf((int)groupId)).partitionOffsets;
        HashMap<Integer, Long> endPartitions = new HashMap<Integer, Long>();
        for (Integer partition : startPartitions.keySet()) {
            endPartitions.put(partition, Long.MAX_VALUE);
        }
        String sequenceName = this.generateSequenceName(groupId);
        HashMap consumerProperties = Maps.newHashMap(this.ioConfig.getConsumerProperties());
        DateTime minimumMessageTime = (DateTime)this.taskGroups.get((Object)Integer.valueOf((int)groupId)).minimumMessageTime.orNull();
        DateTime maximumMessageTime = (DateTime)this.taskGroups.get((Object)Integer.valueOf((int)groupId)).maximumMessageTime.orNull();
        KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(sequenceName, new KafkaPartitions(this.ioConfig.getTopic(), (Map<Integer, Long>)startPartitions), new KafkaPartitions(this.ioConfig.getTopic(), endPartitions), consumerProperties, true, false, minimumMessageTime, maximumMessageTime, this.ioConfig.isSkipOffsetGaps());
        String checkpoints = this.sortingMapper.writerWithType((TypeReference)new TypeReference<TreeMap<Integer, Map<Integer, Long>>>(){}).writeValueAsString(this.taskGroups.get((Object)Integer.valueOf((int)groupId)).sequenceOffsets);
        ImmutableMap context = this.spec.getContext() == null ? ImmutableMap.of((Object)"checkpoints", (Object)checkpoints, (Object)IS_INCREMENTAL_HANDOFF_SUPPORTED, (Object)true) : ImmutableMap.builder().put((Object)"checkpoints", (Object)checkpoints).put((Object)IS_INCREMENTAL_HANDOFF_SUPPORTED, (Object)true).putAll(this.spec.getContext()).build();
        for (int i = 0; i < replicas; ++i) {
            String taskId = Joiner.on((String)"_").join((Object)sequenceName, (Object)KafkaSupervisor.getRandomId(), new Object[0]);
            KafkaIndexTask indexTask = new KafkaIndexTask(taskId, new TaskResource(sequenceName, 1), this.spec.getDataSchema(), this.taskTuningConfig, kafkaIOConfig, (Map<String, Object>)context, null, null);
            Optional taskQueue = this.taskMaster.getTaskQueue();
            if (taskQueue.isPresent()) {
                try {
                    ((TaskQueue)taskQueue.get()).add((Task)indexTask);
                }
                catch (EntryExistsException e) {
                    log.error("Tried to add task [%s] but it already exists", new Object[]{indexTask.getId()});
                }
                continue;
            }
            log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
        }
    }

    private ImmutableMap<Integer, Long> generateStartingOffsetsForPartitionGroup(int groupId) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Map.Entry<Integer, Long> entry : this.partitionGroups.get(groupId).entrySet()) {
            Integer partition = entry.getKey();
            Long offset = entry.getValue();
            if (offset != null && offset != -1L) {
                builder.put((Object)partition, (Object)offset);
                continue;
            }
            builder.put((Object)partition, (Object)this.getOffsetFromStorageForPartition(partition));
        }
        return builder.build();
    }

    private long getOffsetFromStorageForPartition(int partition) {
        long offset;
        Map<Integer, Long> metadataOffsets = this.getOffsetsFromMetadataStorage();
        if (metadataOffsets.get(partition) != null) {
            offset = metadataOffsets.get(partition);
            log.debug("Getting offset [%,d] from metadata storage for partition [%d]", new Object[]{offset, partition});
            long latestKafkaOffset = this.getOffsetFromKafkaForPartition(partition, false);
            if (offset > latestKafkaOffset) {
                throw new ISE("Offset in metadata storage [%,d] > latest Kafka offset [%,d] for partition[%d] dataSource[%s]. If these messages are no longer available (perhaps you deleted and re-created your Kafka topic) you can use the supervisor reset API to restart ingestion.", new Object[]{offset, latestKafkaOffset, partition, this.dataSource});
            }
        } else {
            offset = this.getOffsetFromKafkaForPartition(partition, this.ioConfig.isUseEarliestOffset());
            log.debug("Getting offset [%,d] from Kafka for partition [%d]", new Object[]{offset, partition});
        }
        return offset;
    }

    private Map<Integer, Long> getOffsetsFromMetadataStorage() {
        KafkaPartitions partitions;
        DataSourceMetadata dataSourceMetadata = this.indexerMetadataStorageCoordinator.getDataSourceMetadata(this.dataSource);
        if (dataSourceMetadata != null && dataSourceMetadata instanceof KafkaDataSourceMetadata && (partitions = ((KafkaDataSourceMetadata)dataSourceMetadata).getKafkaPartitions()) != null) {
            if (!this.ioConfig.getTopic().equals(partitions.getTopic())) {
                log.warn("Topic in metadata storage [%s] doesn't match spec topic [%s], ignoring stored offsets", new Object[]{partitions.getTopic(), this.ioConfig.getTopic()});
                return ImmutableMap.of();
            }
            if (partitions.getPartitionOffsetMap() != null) {
                return partitions.getPartitionOffsetMap();
            }
        }
        return ImmutableMap.of();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOffset) {
        Object object = this.consumerLock;
        synchronized (object) {
            TopicPartition topicPartition = new TopicPartition(this.ioConfig.getTopic(), partition);
            if (!this.consumer.assignment().contains(topicPartition)) {
                this.consumer.assign(Collections.singletonList(topicPartition));
            }
            if (useEarliestOffset) {
                this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
            } else {
                this.consumer.seekToEnd(Collections.singletonList(topicPartition));
            }
            return this.consumer.position(topicPartition);
        }
    }

    private boolean isTaskCurrent(int taskGroupId, String taskId) {
        Optional taskOptional = this.taskStorage.getTask(taskId);
        if (!taskOptional.isPresent() || !(taskOptional.get() instanceof KafkaIndexTask)) {
            return false;
        }
        String taskSequenceName = ((KafkaIndexTask)((Object)taskOptional.get())).getIOConfig().getBaseSequenceName();
        if (this.taskGroups.get(taskGroupId) != null) {
            return this.generateSequenceName(taskGroupId).equals(taskSequenceName);
        }
        return this.generateSequenceName(((KafkaIndexTask)((Object)taskOptional.get())).getIOConfig().getStartPartitions().getPartitionOffsetMap(), ((KafkaIndexTask)((Object)taskOptional.get())).getIOConfig().getMinimumMessageTime(), ((KafkaIndexTask)((Object)taskOptional.get())).getIOConfig().getMaximumMessageTime()).equals(taskSequenceName);
    }

    private ListenableFuture<?> stopTasksInGroup(TaskGroup taskGroup) {
        if (taskGroup == null) {
            return Futures.immediateFuture(null);
        }
        ArrayList futures = Lists.newArrayList();
        for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
            if (entry.getValue().status.isComplete()) continue;
            futures.add(this.stopTask(entry.getKey(), false));
        }
        return Futures.successfulAsList((Iterable)futures);
    }

    private ListenableFuture<Void> stopTask(final String id, boolean publish) {
        return Futures.transform(this.taskClient.stopAsync(id, publish), (Function)new Function<Boolean, Void>(){

            @Nullable
            public Void apply(@Nullable Boolean result) {
                if (result == null || !result.booleanValue()) {
                    log.info("Task [%s] failed to stop in a timely manner, killing task", new Object[]{id});
                    KafkaSupervisor.this.killTask(id);
                }
                return null;
            }
        });
    }

    private void killTask(String id) {
        Optional taskQueue = this.taskMaster.getTaskQueue();
        if (taskQueue.isPresent()) {
            ((TaskQueue)taskQueue.get()).shutdown(id);
        } else {
            log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
        }
    }

    protected int getTaskGroupIdForPartition(int partition) {
        return partition % this.ioConfig.getTaskCount();
    }

    private boolean isTaskInPendingCompletionGroups(String taskId) {
        for (List list : this.pendingCompletionTaskGroups.values()) {
            for (TaskGroup taskGroup : list) {
                if (!taskGroup.tasks.containsKey(taskId)) continue;
                return true;
            }
        }
        return false;
    }

    private KafkaSupervisorReport generateReport(boolean includeOffsets) {
        int numPartitions = this.partitionGroups.values().stream().mapToInt(Map::size).sum();
        Map<Integer, Long> partitionLag = this.getLagPerPartition(this.getHighestCurrentOffsets());
        KafkaSupervisorReport report = new KafkaSupervisorReport(this.dataSource, DateTimes.nowUtc(), this.ioConfig.getTopic(), numPartitions, this.ioConfig.getReplicas(), this.ioConfig.getTaskDuration().getMillis() / 1000L, includeOffsets ? this.latestOffsetsFromKafka : null, includeOffsets ? partitionLag : null, includeOffsets ? Long.valueOf(partitionLag.values().stream().mapToLong(x -> Math.max(x, 0L)).sum()) : null, includeOffsets ? this.offsetsLastUpdated : null);
        ArrayList taskReports = Lists.newArrayList();
        try {
            for (TaskGroup taskGroup : this.taskGroups.values()) {
                for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                    String taskId = entry.getKey();
                    DateTime startTime = entry.getValue().startTime;
                    Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
                    Long remainingSeconds = null;
                    if (startTime != null) {
                        remainingSeconds = Math.max(0L, this.ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() - startTime.getMillis())) / 1000L;
                    }
                    taskReports.add(new TaskReportData(taskId, (Map<Integer, Long>)(includeOffsets ? taskGroup.partitionOffsets : null), includeOffsets ? currentOffsets : null, startTime, remainingSeconds, TaskReportData.TaskType.ACTIVE, includeOffsets ? this.getLagPerPartition(currentOffsets) : null));
                }
            }
            for (List list : this.pendingCompletionTaskGroups.values()) {
                for (TaskGroup taskGroup : list) {
                    for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                        String taskId = entry.getKey();
                        DateTime startTime = entry.getValue().startTime;
                        Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
                        Long remainingSeconds = null;
                        if (taskGroup.completionTimeout != null) {
                            remainingSeconds = Math.max(0L, taskGroup.completionTimeout.getMillis() - System.currentTimeMillis()) / 1000L;
                        }
                        taskReports.add(new TaskReportData(taskId, (Map<Integer, Long>)(includeOffsets ? taskGroup.partitionOffsets : null), includeOffsets ? currentOffsets : null, startTime, remainingSeconds, TaskReportData.TaskType.PUBLISHING, null));
                    }
                }
            }
            taskReports.forEach(report::addTask);
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Failed to generate status report", new Object[0]);
        }
        return report;
    }

    private Runnable buildRunTask() {
        return () -> this.notices.add(new RunNotice());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateLatestOffsetsFromKafka() {
        Object object = this.consumerLock;
        synchronized (object) {
            Map topics = this.consumer.listTopics();
            if (topics == null || !topics.containsKey(this.ioConfig.getTopic())) {
                throw new ISE("Could not retrieve partitions for topic [%s]", new Object[]{this.ioConfig.getTopic()});
            }
            Set topicPartitions = ((List)topics.get(this.ioConfig.getTopic())).stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
            this.consumer.assign(topicPartitions);
            this.consumer.seekToEnd(topicPartitions);
            this.latestOffsetsFromKafka = topicPartitions.stream().collect(Collectors.toMap(TopicPartition::partition, arg_0 -> ((KafkaConsumer)this.consumer).position(arg_0)));
        }
    }

    private Map<Integer, Long> getHighestCurrentOffsets() {
        return this.taskGroups.values().stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()).flatMap(taskData -> ((TaskData)taskData.getValue()).currentOffsets.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::max));
    }

    private Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets) {
        return currentOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> this.latestOffsetsFromKafka != null && this.latestOffsetsFromKafka.get(e.getKey()) != null && e.getValue() != null ? this.latestOffsetsFromKafka.get(e.getKey()) - (Long)e.getValue() : Integer.MIN_VALUE));
    }

    private Runnable emitLag() {
        return () -> {
            try {
                Map<Integer, Long> highestCurrentOffsets = this.getHighestCurrentOffsets();
                if (this.latestOffsetsFromKafka == null) {
                    throw new ISE("Latest offsets from Kafka have not been fetched", new Object[0]);
                }
                if (!this.latestOffsetsFromKafka.keySet().equals(highestCurrentOffsets.keySet())) {
                    log.warn("Lag metric: Kafka partitions %s do not match task partitions %s", new Object[]{this.latestOffsetsFromKafka.keySet(), highestCurrentOffsets.keySet()});
                }
                long lag = this.getLagPerPartition(highestCurrentOffsets).values().stream().mapToLong(x -> Math.max(x, 0L)).sum();
                this.emitter.emit(ServiceMetricEvent.builder().setDimension("dataSource", this.dataSource).build("ingest/kafka/lag", (Number)lag));
            }
            catch (Exception e) {
                log.warn((Throwable)e, "Unable to compute Kafka lag", new Object[0]);
            }
        };
    }

    private void updateCurrentOffsets() throws InterruptedException, ExecutionException, TimeoutException {
        List futures = Stream.concat(this.taskGroups.values().stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()), this.pendingCompletionTaskGroups.values().stream().flatMap(Collection::stream).flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())).map(task -> Futures.transform(this.taskClient.getCurrentOffsetsAsync((String)task.getKey(), false), currentOffsets -> {
            if (currentOffsets != null && !currentOffsets.isEmpty()) {
                ((TaskData)task.getValue()).currentOffsets = currentOffsets;
            }
            return null;
        })).collect(Collectors.toList());
        Futures.successfulAsList(futures).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
    }

    @VisibleForTesting
    Runnable updateCurrentAndLatestOffsets() {
        return () -> {
            try {
                this.updateCurrentOffsets();
                this.updateLatestOffsetsFromKafka();
                this.offsetsLastUpdated = DateTimes.nowUtc();
            }
            catch (Exception e) {
                log.warn((Throwable)e, "Exception while getting current/latest offsets", new Object[0]);
            }
        };
    }

    private class CheckpointNotice
    implements Notice {
        final String sequenceName;
        final KafkaDataSourceMetadata previousCheckpoint;
        final KafkaDataSourceMetadata currentCheckpoint;

        CheckpointNotice(String sequenceName, KafkaDataSourceMetadata previousCheckpoint, KafkaDataSourceMetadata currentCheckpoint) {
            this.sequenceName = sequenceName;
            this.previousCheckpoint = previousCheckpoint;
            this.currentCheckpoint = currentCheckpoint;
        }

        @Override
        public void handle() throws ExecutionException, InterruptedException, TimeoutException {
            Preconditions.checkNotNull(KafkaSupervisor.this.sequenceTaskGroup.get(this.sequenceName), (String)"WTH?! cannot find task group for this sequence [%s], sequencesTaskGroup map [%s], taskGroups [%s]", (Object[])new Object[]{this.sequenceName, KafkaSupervisor.this.sequenceTaskGroup, KafkaSupervisor.this.taskGroups});
            TreeMap<Integer, Map<Integer, Long>> checkpoints = ((TaskGroup)((KafkaSupervisor)KafkaSupervisor.this).sequenceTaskGroup.get((Object)this.sequenceName)).sequenceOffsets;
            if (this.previousCheckpoint != null) {
                int sequenceId;
                Map<Integer, Long> checkpoint;
                int index = checkpoints.size();
                Iterator<Integer> iterator = checkpoints.descendingKeySet().iterator();
                while (iterator.hasNext() && !(checkpoint = checkpoints.get(sequenceId = iterator.next().intValue())).equals(this.previousCheckpoint.getKafkaPartitions().getPartitionOffsetMap())) {
                    --index;
                }
                if (index == 0) {
                    throw new ISE("No such previous checkpoint [%s] found", new Object[]{this.previousCheckpoint});
                }
                if (index < checkpoints.size()) {
                    Preconditions.checkState((index == checkpoints.size() - 1 ? 1 : 0) != 0, (Object)"checkpoint consistency failure");
                    log.info("Already checkpointed with offsets [%s]", new Object[]{checkpoints.lastEntry().getValue()});
                    return;
                }
            } else {
                Preconditions.checkState((checkpoints.size() <= 1 ? 1 : 0) != 0, (Object)"Got checkpoint request with null as previous check point, however found more than one checkpoints");
                if (checkpoints.size() == 1) {
                    log.info("Already checkpointed with dataSourceMetadata [%s]", new Object[]{checkpoints.get(0)});
                    return;
                }
            }
            int taskGroupId = KafkaSupervisor.this.getTaskGroupIdForPartition(this.currentCheckpoint.getKafkaPartitions().getPartitionOffsetMap().keySet().iterator().next());
            Map newCheckpoint = (Map)KafkaSupervisor.this.checkpointTaskGroup(taskGroupId, false).get();
            ((TaskGroup)KafkaSupervisor.this.sequenceTaskGroup.get(this.sequenceName)).addNewCheckpoint(newCheckpoint);
            log.info("Handled checkpoint notice, new checkpoint is [%s] for sequence [%s]", new Object[]{newCheckpoint, this.sequenceName});
        }
    }

    private class ResetNotice
    implements Notice {
        final DataSourceMetadata dataSourceMetadata;

        ResetNotice(DataSourceMetadata dataSourceMetadata) {
            this.dataSourceMetadata = dataSourceMetadata;
        }

        @Override
        public void handle() {
            KafkaSupervisor.this.resetInternal(this.dataSourceMetadata);
        }
    }

    private class ShutdownNotice
    implements Notice {
        private ShutdownNotice() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle() throws InterruptedException, ExecutionException, TimeoutException {
            KafkaSupervisor.this.consumer.close();
            Object object = KafkaSupervisor.this.stopLock;
            synchronized (object) {
                KafkaSupervisor.this.stopped = true;
                KafkaSupervisor.this.stopLock.notifyAll();
            }
        }
    }

    private class GracefulShutdownNotice
    extends ShutdownNotice {
        private GracefulShutdownNotice() {
        }

        @Override
        public void handle() throws InterruptedException, ExecutionException, TimeoutException {
            KafkaSupervisor.this.gracefulShutdownInternal();
            super.handle();
        }
    }

    private class RunNotice
    implements Notice {
        private RunNotice() {
        }

        @Override
        public void handle() throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException {
            long nowTime = System.currentTimeMillis();
            if (nowTime - KafkaSupervisor.this.lastRunTime < 1000L) {
                return;
            }
            KafkaSupervisor.this.lastRunTime = nowTime;
            KafkaSupervisor.this.runInternal();
        }
    }

    private static interface Notice {
        public void handle() throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException;
    }

    private static class TaskData {
        volatile TaskStatus status;
        volatile DateTime startTime;
        volatile Map<Integer, Long> currentOffsets = new HashMap<Integer, Long>();

        private TaskData() {
        }
    }

    private static class TaskGroup {
        final ImmutableMap<Integer, Long> partitionOffsets;
        final ConcurrentHashMap<String, TaskData> tasks = new ConcurrentHashMap();
        final Optional<DateTime> minimumMessageTime;
        final Optional<DateTime> maximumMessageTime;
        DateTime completionTimeout;
        final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap();

        public TaskGroup(ImmutableMap<Integer, Long> partitionOffsets, Optional<DateTime> minimumMessageTime, Optional<DateTime> maximumMessageTime) {
            this.partitionOffsets = partitionOffsets;
            this.minimumMessageTime = minimumMessageTime;
            this.maximumMessageTime = maximumMessageTime;
            this.sequenceOffsets.put(0, (Map<Integer, Long>)partitionOffsets);
        }

        public int addNewCheckpoint(Map<Integer, Long> checkpoint) {
            this.sequenceOffsets.put(this.sequenceOffsets.lastKey() + 1, checkpoint);
            return this.sequenceOffsets.lastKey();
        }

        Set<String> taskIds() {
            return this.tasks.keySet();
        }
    }
}

