/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedShardSpecFactory;
import org.apache.druid.timeline.partition.ShardSpecFactory;
import org.apache.druid.utils.CircularBuffer;
import org.joda.time.DateTime;
import org.joda.time.ReadableInstant;

public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>
extends AbstractTask
implements ChatHandler {
    public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15L;
    private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTask.class);
    protected final DataSchema dataSchema;
    protected final SeekableStreamIndexTaskTuningConfig tuningConfig;
    protected final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
    protected final Optional<ChatHandlerProvider> chatHandlerProvider;
    protected final Map<String, Object> context;
    protected final AuthorizerMapper authorizerMapper;
    protected final RowIngestionMetersFactory rowIngestionMetersFactory;
    protected final CircularBuffer<Throwable> savedParseExceptions;
    protected final AppenderatorsManager appenderatorsManager;
    protected final LockGranularity lockGranularityToUse;
    private final Supplier<SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType>> runnerSupplier;

    public SeekableStreamIndexTask(String id, @Nullable TaskResource taskResource, DataSchema dataSchema, SeekableStreamIndexTaskTuningConfig tuningConfig, SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig, @Nullable Map<String, Object> context, @Nullable ChatHandlerProvider chatHandlerProvider, AuthorizerMapper authorizerMapper, RowIngestionMetersFactory rowIngestionMetersFactory, @Nullable String groupId, AppenderatorsManager appenderatorsManager) {
        super(id, groupId, taskResource, dataSchema.getDataSource(), context);
        this.dataSchema = (DataSchema)Preconditions.checkNotNull((Object)dataSchema, (Object)"dataSchema");
        this.tuningConfig = (SeekableStreamIndexTaskTuningConfig)Preconditions.checkNotNull((Object)tuningConfig, (Object)"tuningConfig");
        this.ioConfig = (SeekableStreamIndexTaskIOConfig)Preconditions.checkNotNull(ioConfig, (Object)"ioConfig");
        this.chatHandlerProvider = Optional.fromNullable((Object)chatHandlerProvider);
        this.savedParseExceptions = tuningConfig.getMaxSavedParseExceptions() > 0 ? new CircularBuffer(tuningConfig.getMaxSavedParseExceptions()) : null;
        this.context = context;
        this.authorizerMapper = authorizerMapper;
        this.rowIngestionMetersFactory = rowIngestionMetersFactory;
        this.runnerSupplier = Suppliers.memoize(this::createTaskRunner);
        this.appenderatorsManager = appenderatorsManager;
        this.lockGranularityToUse = this.getContextValue("forceTimeChunkLock", true) != false ? LockGranularity.TIME_CHUNK : LockGranularity.SEGMENT;
    }

    protected static String getFormattedGroupId(String dataSource, String type) {
        return StringUtils.format((String)"%s_%s", (Object[])new Object[]{type, dataSource});
    }

    @Override
    public int getPriority() {
        return this.getContextValue("priority", 75);
    }

    @Override
    public boolean isReady(TaskActionClient taskActionClient) {
        return true;
    }

    @JsonProperty
    public DataSchema getDataSchema() {
        return this.dataSchema;
    }

    @JsonProperty
    public SeekableStreamIndexTaskTuningConfig getTuningConfig() {
        return this.tuningConfig;
    }

    @JsonProperty(value="ioConfig")
    public SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> getIOConfig() {
        return this.ioConfig;
    }

    @Override
    public TaskStatus run(TaskToolbox toolbox) {
        return this.getRunner().run(toolbox);
    }

    @Override
    public boolean canRestore() {
        return true;
    }

    @Override
    public void stopGracefully(TaskConfig taskConfig) {
        if (taskConfig.isRestoreTasksOnRestart()) {
            this.getRunner().stopGracefully();
        } else {
            this.getRunner().stopForcefully();
        }
    }

    @Override
    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        if (this.getRunner().getAppenderator() == null) {
            return new NoopQueryRunner();
        }
        return (queryPlus, responseContext) -> queryPlus.run((QuerySegmentWalker)this.getRunner().getAppenderator(), responseContext);
    }

    public Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) {
        return this.appenderatorsManager.createRealtimeAppenderatorForTask(this.getId(), this.dataSchema, (AppenderatorConfig)this.tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), metrics, toolbox.getSegmentPusher(), toolbox.getJsonMapper(), toolbox.getIndexIO(), (IndexMerger)toolbox.getIndexMergerV9(), toolbox.getQueryRunnerFactoryConglomerate(), toolbox.getSegmentAnnouncer(), toolbox.getEmitter(), toolbox.getQueryExecutorService(), toolbox.getCache(), toolbox.getCacheConfig(), toolbox.getCachePopulatorStats());
    }

    public StreamAppenderatorDriver newDriver(Appenderator appenderator, TaskToolbox toolbox, FireDepartmentMetrics metrics) {
        return new StreamAppenderatorDriver(appenderator, (SegmentAllocator)new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), this.dataSchema, (schema, row, sequenceName, previousSegmentId, skipSegmentLineageCheck) -> new SegmentAllocateAction(schema.getDataSource(), row.getTimestamp(), schema.getGranularitySpec().getQueryGranularity(), schema.getGranularitySpec().getSegmentGranularity(), sequenceName, previousSegmentId, skipSegmentLineageCheck, (ShardSpecFactory)NumberedShardSpecFactory.instance(), this.lockGranularityToUse)), toolbox.getSegmentHandoffNotifierFactory(), (UsedSegmentChecker)new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), toolbox.getDataSegmentKiller(), toolbox.getJsonMapper(), metrics);
    }

    public boolean withinMinMaxRecordTime(InputRow row) {
        boolean afterMaximumMessageTime;
        boolean beforeMinimumMessageTime = this.ioConfig.getMinimumMessageTime().isPresent() && ((DateTime)this.ioConfig.getMinimumMessageTime().get()).isAfter((ReadableInstant)row.getTimestamp());
        boolean bl = afterMaximumMessageTime = this.ioConfig.getMaximumMessageTime().isPresent() && ((DateTime)this.ioConfig.getMaximumMessageTime().get()).isBefore((ReadableInstant)row.getTimestamp());
        if (!Intervals.ETERNITY.contains((ReadableInstant)row.getTimestamp())) {
            String errorMsg = StringUtils.format((String)"Encountered row with timestamp that cannot be represented as a long: [%s]", (Object[])new Object[]{row});
            throw new ParseException(errorMsg, new Object[0]);
        }
        if (log.isDebugEnabled()) {
            if (beforeMinimumMessageTime) {
                log.debug("CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", new Object[]{row.getTimestamp(), this.ioConfig.getMinimumMessageTime().get()});
            } else if (afterMaximumMessageTime) {
                log.debug("CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", new Object[]{row.getTimestamp(), this.ioConfig.getMaximumMessageTime().get()});
            }
        }
        return !beforeMinimumMessageTime && !afterMaximumMessageTime;
    }

    protected abstract SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> createTaskRunner();

    protected abstract RecordSupplier<PartitionIdType, SequenceOffsetType> newTaskRecordSupplier();

    @VisibleForTesting
    public Appenderator getAppenderator() {
        return this.getRunner().getAppenderator();
    }

    @VisibleForTesting
    public SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> getRunner() {
        return (SeekableStreamIndexTaskRunner)this.runnerSupplier.get();
    }
}

