/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.RowIngestionMetersTotals;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedShardSpecFactory;
import org.apache.druid.timeline.partition.ShardSpecFactory;
import org.apache.druid.utils.CircularBuffer;

public class AppenderatorDriverRealtimeIndexTask
extends AbstractTask
implements ChatHandler {
    private static final String CTX_KEY_LOOKUP_TIER = "lookupTier";
    private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
    @JsonIgnore
    private final RealtimeAppenderatorIngestionSpec spec;
    @JsonIgnore
    private final Queue<ListenableFuture<SegmentsAndMetadata>> pendingHandoffs;
    @JsonIgnore
    private volatile Appenderator appenderator = null;
    @JsonIgnore
    private volatile Firehose firehose = null;
    @JsonIgnore
    private volatile FireDepartmentMetrics metrics = null;
    @JsonIgnore
    private final RowIngestionMeters rowIngestionMeters;
    @JsonIgnore
    private volatile boolean gracefullyStopped = false;
    @JsonIgnore
    private volatile boolean finishingJob = false;
    @JsonIgnore
    private volatile Thread runThread = null;
    @JsonIgnore
    private CircularBuffer<Throwable> savedParseExceptions;
    @JsonIgnore
    private final Optional<ChatHandlerProvider> chatHandlerProvider;
    @JsonIgnore
    private final AuthorizerMapper authorizerMapper;
    @JsonIgnore
    private final LockGranularity lockGranularity;
    @JsonIgnore
    private IngestionState ingestionState;
    @JsonIgnore
    private String errorMsg;
    @JsonIgnore
    private AppenderatorsManager appenderatorsManager;

    private static String makeTaskId(RealtimeAppenderatorIngestionSpec spec) {
        return StringUtils.format((String)"index_realtime_%s_%d_%s_%s", (Object[])new Object[]{spec.getDataSchema().getDataSource(), ((RealtimeAppenderatorTuningConfig)spec.getTuningConfig()).getShardSpec().getPartitionNum(), DateTimes.nowUtc(), RealtimeIndexTask.makeRandomId()});
    }

    @JsonCreator
    public AppenderatorDriverRealtimeIndexTask(@JsonProperty(value="id") String id, @JsonProperty(value="resource") TaskResource taskResource, @JsonProperty(value="spec") RealtimeAppenderatorIngestionSpec spec, @JsonProperty(value="context") Map<String, Object> context, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, @JacksonInject AppenderatorsManager appenderatorsManager) {
        super(id == null ? AppenderatorDriverRealtimeIndexTask.makeTaskId(spec) : id, StringUtils.format((String)"index_realtime_appenderator_%s", (Object[])new Object[]{spec.getDataSchema().getDataSource()}), taskResource, spec.getDataSchema().getDataSource(), context);
        this.spec = spec;
        this.pendingHandoffs = new ConcurrentLinkedQueue<ListenableFuture<SegmentsAndMetadata>>();
        this.chatHandlerProvider = Optional.fromNullable((Object)chatHandlerProvider);
        this.authorizerMapper = authorizerMapper;
        if (((RealtimeAppenderatorTuningConfig)spec.getTuningConfig()).getMaxSavedParseExceptions() > 0) {
            this.savedParseExceptions = new CircularBuffer(((RealtimeAppenderatorTuningConfig)spec.getTuningConfig()).getMaxSavedParseExceptions());
        }
        this.ingestionState = IngestionState.NOT_STARTED;
        this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters();
        this.appenderatorsManager = appenderatorsManager;
        this.lockGranularity = this.getContextValue("forceTimeChunkLock", true) != false ? LockGranularity.TIME_CHUNK : LockGranularity.SEGMENT;
    }

    @Override
    public int getPriority() {
        return this.getContextValue("priority", 75);
    }

    @Override
    public String getType() {
        return "index_realtime_appenderator";
    }

    @Override
    public String getNodeType() {
        return "realtime";
    }

    @Override
    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        if (this.appenderator == null) {
            return new NoopQueryRunner();
        }
        return (queryPlus, responseContext) -> queryPlus.run((QuerySegmentWalker)this.appenderator, responseContext);
    }

    @Override
    public boolean isReady(TaskActionClient taskActionClient) {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TaskStatus run(TaskToolbox toolbox) {
        block30: {
            this.runThread = Thread.currentThread();
            this.setupTimeoutAlert();
            DataSchema dataSchema = this.spec.getDataSchema();
            RealtimeAppenderatorTuningConfig tuningConfig = ((RealtimeAppenderatorTuningConfig)this.spec.getTuningConfig()).withBasePersistDirectory(toolbox.getPersistDir());
            FireDepartment fireDepartmentForMetrics = new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null);
            TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics, this.rowIngestionMeters);
            this.metrics = fireDepartmentForMetrics.getMetrics();
            Supplier committerSupplier = Committers.nilSupplier();
            File firehoseTempDir = toolbox.getIndexingTmpDir();
            DiscoveryDruidNode discoveryDruidNode = this.createDiscoveryDruidNode(toolbox);
            this.appenderator = this.newAppenderator(dataSchema, tuningConfig, this.metrics, toolbox);
            StreamAppenderatorDriver driver = AppenderatorDriverRealtimeIndexTask.newDriver(dataSchema, this.appenderator, toolbox, this.metrics);
            try {
                if (this.chatHandlerProvider.isPresent()) {
                    log.info("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider)this.chatHandlerProvider.get()).getClass().getName()});
                    ((ChatHandlerProvider)this.chatHandlerProvider.get()).register(this.getId(), (ChatHandler)this, false);
                } else {
                    log.warn("No chat handler detected", new Object[0]);
                }
                if (this.appenderatorsManager.shouldTaskMakeNodeAnnouncements()) {
                    toolbox.getDataSegmentServerAnnouncer().announce();
                    toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
                }
                driver.startJob(segmentId -> {
                    try {
                        if (this.lockGranularity == LockGranularity.SEGMENT) {
                            return toolbox.getTaskActionClient().submit(new SegmentLockAcquireAction(TaskLockType.EXCLUSIVE, segmentId.getInterval(), segmentId.getVersion(), segmentId.getShardSpec().getPartitionNum(), 1000L)).isOk();
                        }
                        return toolbox.getTaskActionClient().submit(new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, segmentId.getInterval(), 1000L)) != null;
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
                toolbox.getMonitorScheduler().addMonitor((Monitor)metricsMonitor);
                FileUtils.forceMkdir((File)firehoseTempDir);
                FirehoseFactory firehoseFactory = ((RealtimeIOConfig)this.spec.getIOConfig()).getFirehoseFactory();
                boolean firehoseDrainableByClosing = this.isFirehoseDrainableByClosing(firehoseFactory);
                int sequenceNumber = 0;
                String sequenceName = AppenderatorDriverRealtimeIndexTask.makeSequenceName(this.getId(), sequenceNumber);
                TransactionalSegmentPublisher publisher = (mustBeNullOrEmptySegments, segments, commitMetadata) -> {
                    if (mustBeNullOrEmptySegments != null && !mustBeNullOrEmptySegments.isEmpty()) {
                        throw new ISE("WTH? stream ingestion tasks are overwriting segments[%s]", new Object[]{mustBeNullOrEmptySegments});
                    }
                    SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction(segments, null, null);
                    return toolbox.getTaskActionClient().submit(action);
                };
                AppenderatorDriverRealtimeIndexTask appenderatorDriverRealtimeIndexTask = this;
                synchronized (appenderatorDriverRealtimeIndexTask) {
                    if (!this.gracefullyStopped) {
                        this.firehose = firehoseFactory.connect((InputRowParser)Preconditions.checkNotNull((Object)this.spec.getDataSchema().getParser(), (Object)"inputRowParser"), firehoseTempDir);
                    }
                }
                this.ingestionState = IngestionState.BUILD_SEGMENTS;
                while (!this.gracefullyStopped && firehoseDrainableByClosing && this.firehose.hasMore()) {
                    try {
                        InputRow inputRow = this.firehose.nextRow();
                        if (inputRow == null) {
                            log.debug("Discarded null row, considering thrownAway.", new Object[0]);
                            this.rowIngestionMeters.incrementThrownAway();
                            continue;
                        }
                        AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier);
                        if (addResult.isOk()) {
                            boolean isPushRequired = addResult.isPushRequired(tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(), Long.valueOf(tuningConfig.getPartitionsSpec().getMaxTotalRowsOr(20000000L)));
                            if (isPushRequired) {
                                this.publishSegments(driver, publisher, (Supplier<Committer>)committerSupplier, sequenceName);
                                sequenceName = AppenderatorDriverRealtimeIndexTask.makeSequenceName(this.getId(), ++sequenceNumber);
                            }
                        } else {
                            throw new ISE("Could not allocate segment for row with timestamp[%s]", new Object[]{inputRow.getTimestamp()});
                        }
                        if (addResult.getParseException() != null) {
                            this.handleParseException(addResult.getParseException());
                            continue;
                        }
                        this.rowIngestionMeters.incrementProcessed();
                    }
                    catch (ParseException e) {
                        this.handleParseException(e);
                    }
                }
                this.ingestionState = IngestionState.COMPLETED;
                if (!this.gracefullyStopped) {
                    appenderatorDriverRealtimeIndexTask = this;
                    synchronized (appenderatorDriverRealtimeIndexTask) {
                        if (this.gracefullyStopped) {
                            log.info("Gracefully stopping.", new Object[0]);
                        } else {
                            this.finishingJob = true;
                        }
                    }
                    if (this.finishingJob) {
                        log.info("Finishing job...", new Object[0]);
                        this.publishSegments(driver, publisher, (Supplier<Committer>)committerSupplier, sequenceName);
                        this.waitForSegmentPublishAndHandoff(tuningConfig.getPublishAndHandoffTimeout());
                    }
                    break block30;
                }
                if (this.firehose != null) {
                    log.info("Task was gracefully stopped, will persist data before exiting", new Object[0]);
                    this.persistAndWait(driver, (Committer)committerSupplier.get());
                }
            }
            catch (Throwable e) {
                log.makeAlert(e, "Exception aborted realtime processing[%s]", new Object[]{dataSchema.getDataSource()}).emit();
                this.errorMsg = Throwables.getStackTraceAsString((Throwable)e);
                toolbox.getTaskReportFileWriter().write(this.getId(), this.getTaskCompletionReports());
                TaskStatus taskStatus = TaskStatus.failure((String)this.getId(), (String)this.errorMsg);
                return taskStatus;
            }
            finally {
                if (this.chatHandlerProvider.isPresent()) {
                    ((ChatHandlerProvider)this.chatHandlerProvider.get()).unregister(this.getId());
                }
                CloseQuietly.close((Closeable)this.firehose);
                this.appenderator.close();
                CloseQuietly.close((Closeable)driver);
                toolbox.getMonitorScheduler().removeMonitor((Monitor)metricsMonitor);
                if (this.appenderatorsManager.shouldTaskMakeNodeAnnouncements()) {
                    toolbox.getDataSegmentServerAnnouncer().unannounce();
                    toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                }
            }
        }
        log.info("Job done!", new Object[0]);
        toolbox.getTaskReportFileWriter().write(this.getId(), this.getTaskCompletionReports());
        return TaskStatus.success((String)this.getId());
    }

    @Override
    public boolean canRestore() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stopGracefully(TaskConfig taskConfig) {
        block17: {
            if (taskConfig.isRestoreTasksOnRestart()) {
                try {
                    AppenderatorDriverRealtimeIndexTask appenderatorDriverRealtimeIndexTask = this;
                    synchronized (appenderatorDriverRealtimeIndexTask) {
                        if (!this.gracefullyStopped) {
                            this.gracefullyStopped = true;
                            if (this.firehose == null) {
                                log.info("stopGracefully: Firehose not started yet, so nothing to stop.", new Object[0]);
                            } else if (this.finishingJob) {
                                log.info("stopGracefully: Interrupting finishJob.", new Object[0]);
                                this.runThread.interrupt();
                            } else if (this.isFirehoseDrainableByClosing(((RealtimeIOConfig)this.spec.getIOConfig()).getFirehoseFactory())) {
                                log.info("stopGracefully: Draining firehose.", new Object[0]);
                                this.firehose.close();
                            } else {
                                log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.", new Object[0]);
                                this.runThread.interrupt();
                            }
                        }
                        break block17;
                    }
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            AppenderatorDriverRealtimeIndexTask appenderatorDriverRealtimeIndexTask = this;
            synchronized (appenderatorDriverRealtimeIndexTask) {
                if (!this.gracefullyStopped) {
                    this.gracefullyStopped = true;
                    this.runThread.interrupt();
                }
            }
        }
    }

    @JsonIgnore
    @VisibleForTesting
    public Firehose getFirehose() {
        return this.firehose;
    }

    @JsonIgnore
    @VisibleForTesting
    public FireDepartmentMetrics getMetrics() {
        return this.metrics;
    }

    @JsonIgnore
    @VisibleForTesting
    public RowIngestionMeters getRowIngestionMeters() {
        return this.rowIngestionMeters;
    }

    @JsonProperty(value="spec")
    public RealtimeAppenderatorIngestionSpec getSpec() {
        return this.spec;
    }

    @GET
    @Path(value="/rowStats")
    @Produces(value={"application/json"})
    public Response getRowStats(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        HashMap returnMap = new HashMap();
        HashMap<String, RowIngestionMetersTotals> totalsMap = new HashMap<String, RowIngestionMetersTotals>();
        HashMap<String, Map<String, Object>> averagesMap = new HashMap<String, Map<String, Object>>();
        totalsMap.put("buildSegments", this.rowIngestionMeters.getTotals());
        averagesMap.put("buildSegments", this.rowIngestionMeters.getMovingAverages());
        returnMap.put("movingAverages", averagesMap);
        returnMap.put("totals", totalsMap);
        return Response.ok(returnMap).build();
    }

    @GET
    @Path(value="/unparseableEvents")
    @Produces(value={"application/json"})
    public Response getUnparseableEvents(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(this.savedParseExceptions);
        return Response.ok(events).build();
    }

    protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) {
        return firehoseFactory instanceof EventReceiverFirehoseFactory || firehoseFactory instanceof TimedShutoffFirehoseFactory && this.isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory)firehoseFactory).getDelegateFactory()) || firehoseFactory instanceof ClippedFirehoseFactory && this.isFirehoseDrainableByClosing(((ClippedFirehoseFactory)firehoseFactory).getDelegate());
    }

    private Map<String, TaskReport> getTaskCompletionReports() {
        return TaskReport.buildTaskReports(new IngestionStatsAndErrorsTaskReport(this.getId(), new IngestionStatsAndErrorsTaskReportData(this.ingestionState, this.getTaskCompletionUnparseableEvents(), this.getTaskCompletionRowStats(), this.errorMsg)));
    }

    private Map<String, Object> getTaskCompletionUnparseableEvents() {
        HashMap<String, Object> unparseableEventsMap = new HashMap<String, Object>();
        List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(this.savedParseExceptions);
        if (buildSegmentsParseExceptionMessages != null) {
            unparseableEventsMap.put("buildSegments", buildSegmentsParseExceptionMessages);
        }
        return unparseableEventsMap;
    }

    private Map<String, Object> getTaskCompletionRowStats() {
        HashMap<String, Object> metricsMap = new HashMap<String, Object>();
        metricsMap.put("buildSegments", this.rowIngestionMeters.getTotals());
        return metricsMap;
    }

    private void handleParseException(ParseException pe) {
        if (pe.isFromPartiallyValidRow()) {
            this.rowIngestionMeters.incrementProcessedWithError();
        } else {
            this.rowIngestionMeters.incrementUnparseable();
        }
        if (((RealtimeAppenderatorTuningConfig)this.spec.getTuningConfig()).isLogParseExceptions()) {
            log.error((Throwable)pe, "Encountered parse exception: ", new Object[0]);
        }
        if (this.savedParseExceptions != null) {
            this.savedParseExceptions.add((Object)pe);
        }
        if (this.rowIngestionMeters.getUnparseable() + this.rowIngestionMeters.getProcessedWithError() > (long)((RealtimeAppenderatorTuningConfig)this.spec.getTuningConfig()).getMaxParseExceptions()) {
            log.error("Max parse exceptions exceeded, terminating task...", new Object[0]);
            throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
        }
    }

    private void setupTimeoutAlert() {
        if (((RealtimeAppenderatorTuningConfig)this.spec.getTuningConfig()).getAlertTimeout() > 0L) {
            Timer timer = new Timer("RealtimeIndexTask-Timer", true);
            timer.schedule(new TimerTask(){

                @Override
                public void run() {
                    log.makeAlert("RealtimeIndexTask for dataSource [%s] hasn't finished in configured time [%d] ms.", new Object[]{AppenderatorDriverRealtimeIndexTask.this.spec.getDataSchema().getDataSource(), ((RealtimeAppenderatorTuningConfig)AppenderatorDriverRealtimeIndexTask.this.spec.getTuningConfig()).getAlertTimeout()}).emit();
                }
            }, ((RealtimeAppenderatorTuningConfig)this.spec.getTuningConfig()).getAlertTimeout());
        }
    }

    private void publishSegments(StreamAppenderatorDriver driver, TransactionalSegmentPublisher publisher, Supplier<Committer> committerSupplier, String sequenceName) {
        ListenableFuture publishFuture = driver.publish(publisher, (Committer)committerSupplier.get(), Collections.singletonList(sequenceName));
        this.pendingHandoffs.add((ListenableFuture<SegmentsAndMetadata>)ListenableFutures.transformAsync((ListenableFuture)publishFuture, arg_0 -> ((StreamAppenderatorDriver)driver).registerHandoff(arg_0)));
    }

    private void waitForSegmentPublishAndHandoff(long timeout) throws InterruptedException, ExecutionException, TimeoutException {
        if (!this.pendingHandoffs.isEmpty()) {
            ListenableFuture allHandoffs = Futures.allAsList(this.pendingHandoffs);
            log.info("Waiting for handoffs", new Object[0]);
            if (timeout > 0L) {
                allHandoffs.get(timeout, TimeUnit.MILLISECONDS);
            } else {
                allHandoffs.get();
            }
        }
    }

    private void persistAndWait(StreamAppenderatorDriver driver, final Committer committer) {
        try {
            final CountDownLatch persistLatch = new CountDownLatch(1);
            driver.persist(new Committer(){

                public Object getMetadata() {
                    return committer.getMetadata();
                }

                public void run() {
                    try {
                        committer.run();
                    }
                    finally {
                        persistLatch.countDown();
                    }
                }
            });
            persistLatch.await();
        }
        catch (InterruptedException e) {
            log.debug((Throwable)e, "Interrupted while finishing the job", new Object[0]);
        }
        catch (Exception e) {
            log.makeAlert((Throwable)e, "Failed to finish realtime task", new Object[0]).emit();
            throw e;
        }
    }

    private DiscoveryDruidNode createDiscoveryDruidNode(TaskToolbox toolbox) {
        LookupNodeService lookupNodeService = this.getContextValue(CTX_KEY_LOOKUP_TIER) == null ? toolbox.getLookupNodeService() : new LookupNodeService((String)this.getContextValue(CTX_KEY_LOOKUP_TIER));
        return new DiscoveryDruidNode(toolbox.getDruidNode(), NodeRole.PEON, (Map)ImmutableMap.of((Object)toolbox.getDataNodeService().getName(), (Object)toolbox.getDataNodeService(), (Object)lookupNodeService.getName(), (Object)lookupNodeService));
    }

    private Appenderator newAppenderator(DataSchema dataSchema, RealtimeAppenderatorTuningConfig tuningConfig, FireDepartmentMetrics metrics, TaskToolbox toolbox) {
        return this.appenderatorsManager.createRealtimeAppenderatorForTask(this.getId(), dataSchema, (AppenderatorConfig)tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), metrics, toolbox.getSegmentPusher(), toolbox.getJsonMapper(), toolbox.getIndexIO(), (IndexMerger)toolbox.getIndexMergerV9(), toolbox.getQueryRunnerFactoryConglomerate(), toolbox.getSegmentAnnouncer(), toolbox.getEmitter(), toolbox.getQueryExecutorService(), toolbox.getCache(), toolbox.getCacheConfig(), toolbox.getCachePopulatorStats());
    }

    private static StreamAppenderatorDriver newDriver(DataSchema dataSchema, Appenderator appenderator, TaskToolbox toolbox, FireDepartmentMetrics metrics) {
        return new StreamAppenderatorDriver(appenderator, (SegmentAllocator)new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema, (schema, row, sequenceName, previousSegmentId, skipSegmentLineageCheck) -> new SegmentAllocateAction(schema.getDataSource(), row.getTimestamp(), schema.getGranularitySpec().getQueryGranularity(), schema.getGranularitySpec().getSegmentGranularity(), sequenceName, previousSegmentId, skipSegmentLineageCheck, (ShardSpecFactory)NumberedShardSpecFactory.instance(), LockGranularity.TIME_CHUNK)), toolbox.getSegmentHandoffNotifierFactory(), (UsedSegmentChecker)new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), toolbox.getDataSegmentKiller(), toolbox.getJsonMapper(), metrics);
    }

    private static String makeSequenceName(String taskId, int sequenceNumber) {
        return taskId + "_" + sequenceNumber;
    }
}

