/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.hive.kafka.KafkaTableProperties;
import org.apache.kafkaesque.clients.consumer.Consumer;
import org.apache.kafkaesque.clients.consumer.ConsumerRecord;
import org.apache.kafkaesque.clients.consumer.ConsumerRecords;
import org.apache.kafkaesque.common.TopicPartition;
import org.apache.kafkaesque.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaRecordIterator
implements Iterator<ConsumerRecord<byte[], byte[]>> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordIterator.class);
    private static final String POLL_TIMEOUT_HINT = String.format("Try increasing poll timeout using Hive Table property [%s]", KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName());
    private static final String ERROR_POLL_TIMEOUT_FORMAT = "Consumer returned [0] record due to exhausted poll timeout [%s]ms from TopicPartition:[%s] start Offset [%s], current consumer position [%s], target end offset [%s], " + POLL_TIMEOUT_HINT;
    private final Consumer<byte[], byte[]> consumer;
    private final TopicPartition topicPartition;
    private final long endOffset;
    private final long startOffset;
    private final long pollTimeoutMs;
    private final Duration pollTimeoutDurationMs;
    private final Stopwatch stopwatch = Stopwatch.createUnstarted();
    private ConsumerRecords<byte[], byte[]> records;
    private long consumerPosition;
    private ConsumerRecord<byte[], byte[]> nextRecord;
    private boolean hasMore = true;
    private Iterator<ConsumerRecord<byte[], byte[]>> consumerRecordIterator = null;

    KafkaRecordIterator(Consumer<byte[], byte[]> consumer, TopicPartition topicPartition, @Nullable Long requestedStartOffset, @Nullable Long requestedEndOffset, long pollTimeoutMs) {
        this.consumer = (Consumer)Preconditions.checkNotNull(consumer, (Object)"Consumer can not be null");
        this.topicPartition = (TopicPartition)Preconditions.checkNotNull((Object)topicPartition, (Object)"Topic partition can not be null");
        this.pollTimeoutMs = pollTimeoutMs;
        this.pollTimeoutDurationMs = Duration.ofMillis(pollTimeoutMs);
        Preconditions.checkState((this.pollTimeoutMs > 0L ? 1 : 0) != 0, (Object)"Poll timeout has to be positive number");
        List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
        consumer.assign(topicPartitionList);
        if (requestedEndOffset == null) {
            consumer.seekToEnd(topicPartitionList);
            this.endOffset = consumer.position(topicPartition);
            LOG.info("End Offset set to [{}]", (Object)this.endOffset);
        } else {
            this.endOffset = requestedEndOffset;
        }
        if (requestedStartOffset != null) {
            LOG.info("Seeking to offset [{}] of topic partition [{}]", (Object)requestedStartOffset, (Object)topicPartition);
            consumer.seek(topicPartition, requestedStartOffset);
            this.startOffset = consumer.position(topicPartition);
            if (this.startOffset != requestedStartOffset) {
                LOG.warn("Current Start Offset [{}] is different form the requested start position [{}]", (Object)this.startOffset, (Object)requestedStartOffset);
            }
        } else {
            consumer.seekToBeginning(Collections.singleton(topicPartition));
            this.startOffset = consumer.position(topicPartition);
            LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]", (Object)topicPartition, (Object)this.startOffset);
        }
        this.consumerPosition = consumer.position(topicPartition);
        Preconditions.checkState((this.endOffset >= this.consumerPosition ? 1 : 0) != 0, (String)"End offset [%s] need to be greater or equal than start offset [%s]", (Object[])new Object[]{this.endOffset, this.consumerPosition});
        LOG.info("Kafka Iterator assigned to TopicPartition [{}]; start Offset [{}]; end Offset [{}]", new Object[]{topicPartition, this.consumerPosition, this.endOffset});
    }

    @VisibleForTesting
    KafkaRecordIterator(Consumer<byte[], byte[]> consumer, TopicPartition tp, long pollTimeoutMs) {
        this(consumer, tp, null, null, pollTimeoutMs);
    }

    @Override
    public boolean hasNext() {
        if (!this.hasMore && this.consumerPosition < this.endOffset || this.records == null) {
            this.pollRecords();
            this.findNext();
        }
        return this.hasMore;
    }

    private void pollRecords() {
        if (LOG.isTraceEnabled()) {
            this.stopwatch.reset().start();
        }
        this.records = this.consumer.poll(this.pollTimeoutDurationMs);
        if (LOG.isTraceEnabled()) {
            this.stopwatch.stop();
            LOG.trace("Pulled [{}] records in [{}] ms", (Object)this.records.count(), (Object)this.stopwatch.elapsed(TimeUnit.MILLISECONDS));
        }
        if (this.records.isEmpty() && this.consumer.position(this.topicPartition) < this.endOffset) {
            throw new PollTimeoutException(String.format(ERROR_POLL_TIMEOUT_FORMAT, this.pollTimeoutMs, this.topicPartition.toString(), this.startOffset, this.consumer.position(this.topicPartition), this.endOffset));
        }
        this.consumerRecordIterator = this.records.iterator();
        this.consumerPosition = this.consumer.position(this.topicPartition);
    }

    @Override
    public ConsumerRecord<byte[], byte[]> next() {
        ConsumerRecord<byte[], byte[]> value = this.nextRecord;
        Preconditions.checkState((value.offset() < this.endOffset ? 1 : 0) != 0);
        this.findNext();
        return value;
    }

    private void findNext() {
        if (this.consumerRecordIterator.hasNext()) {
            this.nextRecord = this.consumerRecordIterator.next();
            this.hasMore = this.nextRecord.offset() < this.endOffset;
        } else {
            this.hasMore = false;
            this.nextRecord = null;
        }
    }

    static final class PollTimeoutException
    extends RetriableException {
        private static final long serialVersionUID = 1L;

        PollTimeoutException(String message) {
            super(message);
        }
    }
}

