/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.kafka;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaSequenceNumber;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CircularBuffer;
import org.apache.druid.utils.CollectionUtils;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;

public class IncrementalPublishingKafkaIndexTaskRunner
extends SeekableStreamIndexTaskRunner<Integer, Long> {
    private static final EmittingLogger log = new EmittingLogger(IncrementalPublishingKafkaIndexTaskRunner.class);
    private final KafkaIndexTask task;

    IncrementalPublishingKafkaIndexTaskRunner(KafkaIndexTask task, @Nullable InputRowParser<ByteBuffer> parser, AuthorizerMapper authorizerMapper, Optional<ChatHandlerProvider> chatHandlerProvider, CircularBuffer<Throwable> savedParseExceptions, RowIngestionMetersFactory rowIngestionMetersFactory, AppenderatorsManager appenderatorsManager, LockGranularity lockGranularityToUse) {
        super((SeekableStreamIndexTask)task, parser, authorizerMapper, chatHandlerProvider, savedParseExceptions, rowIngestionMetersFactory, appenderatorsManager, lockGranularityToUse);
        this.task = task;
    }

    protected Long getNextStartOffset(@NotNull Long sequenceNumber) {
        return sequenceNumber + 1L;
    }

    @Nonnull
    protected List<OrderedPartitionableRecord<Integer, Long>> getRecords(RecordSupplier<Integer, Long> recordSupplier, TaskToolbox toolbox) throws Exception {
        ArrayList<OrderedPartitionableRecord<Integer, Long>> records = new ArrayList();
        try {
            records = recordSupplier.poll(this.task.getIOConfig().getPollTimeout());
        }
        catch (OffsetOutOfRangeException e) {
            log.warn("OffsetOutOfRangeException with message [%s]", new Object[]{e.getMessage()});
            this.possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), recordSupplier, toolbox);
        }
        return records;
    }

    protected SeekableStreamEndSequenceNumbers<Integer, Long> deserializePartitionsFromMetadata(ObjectMapper mapper, Object object) {
        return (SeekableStreamEndSequenceNumbers)mapper.convertValue(object, mapper.getTypeFactory().constructParametrizedType(SeekableStreamEndSequenceNumbers.class, SeekableStreamEndSequenceNumbers.class, new Class[]{Integer.class, Long.class}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void possiblyResetOffsetsOrWait(Map<TopicPartition, Long> outOfRangePartitions, RecordSupplier<Integer, Long> recordSupplier, TaskToolbox taskToolbox) throws InterruptedException, IOException {
        HashMap<TopicPartition, Long> resetPartitions = new HashMap<TopicPartition, Long>();
        boolean doReset = false;
        if (this.task.getTuningConfig().isResetOffsetAutomatically()) {
            for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
                TopicPartition topicPartition = outOfRangePartition.getKey();
                long nextOffset = outOfRangePartition.getValue();
                StreamPartition streamPartition2 = StreamPartition.of((String)topicPartition.topic(), (Object)topicPartition.partition());
                Long leastAvailableOffset = (Long)recordSupplier.getEarliestSequenceNumber(streamPartition2);
                if (leastAvailableOffset == null) {
                    throw new ISE("got null sequence number for partition[%s] when fetching from kafka!", new Object[]{topicPartition.partition()});
                }
                recordSupplier.seek(streamPartition2, (Object)nextOffset);
                if (leastAvailableOffset <= nextOffset) continue;
                doReset = true;
                resetPartitions.put(topicPartition, nextOffset);
            }
        }
        if (doReset) {
            this.sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, streamPartition -> StreamPartition.of((String)streamPartition.topic(), (Object)streamPartition.partition())), taskToolbox);
        } else {
            log.warn("Retrying in %dms", new Object[]{this.task.getPollRetryMs()});
            this.pollRetryLock.lockInterruptibly();
            try {
                long nanos = TimeUnit.MILLISECONDS.toNanos(this.task.getPollRetryMs());
                while (nanos > 0L && !this.pauseRequested && !this.stopRequested.get()) {
                    nanos = this.isAwaitingRetry.awaitNanos(nanos);
                }
            }
            finally {
                this.pollRetryLock.unlock();
            }
        }
    }

    protected SeekableStreamDataSourceMetadata<Integer, Long> createDataSourceMetadata(SeekableStreamSequenceNumbers<Integer, Long> partitions) {
        return new KafkaDataSourceMetadata(partitions);
    }

    protected OrderedSequenceNumber<Long> createSequenceNumber(Long sequenceNumber) {
        return KafkaSequenceNumber.of(sequenceNumber);
    }

    protected void possiblyResetDataSourceMetadata(TaskToolbox toolbox, RecordSupplier<Integer, Long> recordSupplier, Set<StreamPartition<Integer>> assignment) {
    }

    protected boolean isEndOffsetExclusive() {
        return true;
    }

    protected boolean isEndOfShard(Long seqNum) {
        return false;
    }

    public TypeReference<List<SequenceMetadata<Integer, Long>>> getSequenceMetadataTypeReference() {
        return new TypeReference<List<SequenceMetadata<Integer, Long>>>(){};
    }

    @Nullable
    protected TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(TaskToolbox toolbox, String checkpointsString) throws IOException {
        if (checkpointsString != null) {
            log.debug("Got checkpoints from task context[%s].", new Object[]{checkpointsString});
            return (TreeMap)toolbox.getJsonMapper().readValue(checkpointsString, (TypeReference)new TypeReference<TreeMap<Integer, Map<Integer, Long>>>(){});
        }
        return null;
    }
}

