/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.kinesis;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.util.AwsHostNameUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.aws.AWSCredentialsUtils;
import org.apache.druid.indexing.kinesis.KinesisSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;

public class KinesisRecordSupplier
implements RecordSupplier<String, String> {
    private static final EmittingLogger log = new EmittingLogger(KinesisRecordSupplier.class);
    private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000L;
    private static final long EXCEPTION_RETRY_DELAY_MS = 10000L;
    private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000;
    private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10;
    private final MethodHandle deaggregateHandle;
    private final MethodHandle getDataHandle;
    private final AmazonKinesis kinesis;
    private final int recordsPerFetch;
    private final int fetchDelayMillis;
    private final boolean deaggregate;
    private final int recordBufferOfferTimeout;
    private final int recordBufferFullWait;
    private final int fetchSequenceNumberTimeout;
    private final int maxRecordsPerPoll;
    private final int fetchThreads;
    private final int recordBufferSize;
    private final boolean useEarliestSequenceNumber;
    private ScheduledExecutorService scheduledExec;
    private final ConcurrentMap<StreamPartition<String>, PartitionResource> partitionResources = new ConcurrentHashMap<StreamPartition<String>, PartitionResource>();
    private BlockingQueue<OrderedPartitionableRecord<String, String>> records;
    private final boolean backgroundFetchEnabled;
    private volatile boolean closed = false;
    private AtomicBoolean partitionsFetchStarted = new AtomicBoolean();

    private static boolean isServiceExceptionRecoverable(AmazonServiceException ex) {
        boolean isIOException = ex.getCause() instanceof IOException;
        boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode());
        boolean isInternalError = ex.getStatusCode() == 500 || ex.getStatusCode() == 503;
        return isIOException || isTimeout || isInternalError;
    }

    private static byte[] toByteArray(ByteBuffer buffer) {
        if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.position() == 0 && buffer.array().length == buffer.limit()) {
            return buffer.array();
        }
        byte[] retVal = new byte[buffer.remaining()];
        buffer.duplicate().get(retVal);
        return retVal;
    }

    private static <T> T wrapExceptions(Callable<T> callable) {
        try {
            return callable.call();
        }
        catch (Exception e) {
            throw new StreamException((Throwable)e);
        }
    }

    public KinesisRecordSupplier(AmazonKinesis amazonKinesis, int recordsPerFetch, int fetchDelayMillis, int fetchThreads, boolean deaggregate, int recordBufferSize, int recordBufferOfferTimeout, int recordBufferFullWait, int fetchSequenceNumberTimeout, int maxRecordsPerPoll, boolean useEarliestSequenceNumber) {
        Preconditions.checkNotNull((Object)amazonKinesis);
        this.kinesis = amazonKinesis;
        this.recordsPerFetch = recordsPerFetch;
        this.fetchDelayMillis = fetchDelayMillis;
        this.deaggregate = deaggregate;
        this.recordBufferOfferTimeout = recordBufferOfferTimeout;
        this.recordBufferFullWait = recordBufferFullWait;
        this.fetchSequenceNumberTimeout = fetchSequenceNumberTimeout;
        this.maxRecordsPerPoll = maxRecordsPerPoll;
        this.fetchThreads = fetchThreads;
        this.recordBufferSize = recordBufferSize;
        this.useEarliestSequenceNumber = useEarliestSequenceNumber;
        boolean bl = this.backgroundFetchEnabled = fetchThreads > 0;
        if (deaggregate) {
            try {
                Class<?> kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
                MethodHandles.Lookup lookup = MethodHandles.publicLookup();
                Method deaggregateMethod = kclUserRecordclass.getMethod("deaggregate", List.class);
                Method getDataMethod = kclUserRecordclass.getMethod("getData", new Class[0]);
                this.deaggregateHandle = lookup.unreflect(deaggregateMethod);
                this.getDataHandle = lookup.unreflect(getDataMethod);
            }
            catch (ClassNotFoundException e) {
                throw new ISE((Throwable)e, "cannot find class[com.amazonaws.services.kinesis.clientlibrary.types.UserRecord], note that when using deaggregate=true, you must provide the Kinesis Client Library jar in the classpath", new Object[0]);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            this.deaggregateHandle = null;
            this.getDataHandle = null;
        }
        if (this.backgroundFetchEnabled) {
            log.info("Creating fetch thread pool of size [%d] (Runtime.availableProcessors=%d)", new Object[]{fetchThreads, Runtime.getRuntime().availableProcessors()});
            this.scheduledExec = Executors.newScheduledThreadPool(fetchThreads, Execs.makeThreadFactory((String)"KinesisRecordSupplier-Worker-%d"));
        }
        this.records = new LinkedBlockingQueue<OrderedPartitionableRecord<String, String>>(recordBufferSize);
    }

    public static AmazonKinesis getAmazonKinesisClient(String endpoint, AWSCredentialsConfig awsCredentialsConfig, String awsAssumedRoleArn, String awsExternalId) {
        AWSCredentialsProviderChain awsCredentialsProvider = AWSCredentialsUtils.defaultAWSCredentialsProviderChain((AWSCredentialsConfig)awsCredentialsConfig);
        if (awsAssumedRoleArn != null) {
            log.info("Assuming role [%s] with externalId [%s]", new Object[]{awsAssumedRoleArn, awsExternalId});
            STSAssumeRoleSessionCredentialsProvider.Builder builder = new STSAssumeRoleSessionCredentialsProvider.Builder(awsAssumedRoleArn, StringUtils.format((String)"druid-kinesis-%s", (Object[])new Object[]{UUID.randomUUID().toString()})).withStsClient((AWSSecurityTokenService)((AWSSecurityTokenServiceClientBuilder)AWSSecurityTokenServiceClientBuilder.standard().withCredentials((AWSCredentialsProvider)awsCredentialsProvider)).build());
            if (awsExternalId != null) {
                builder.withExternalId(awsExternalId);
            }
            awsCredentialsProvider = builder.build();
        }
        return (AmazonKinesis)((AmazonKinesisClientBuilder)((AmazonKinesisClientBuilder)((AmazonKinesisClientBuilder)AmazonKinesisClientBuilder.standard().withCredentials((AWSCredentialsProvider)awsCredentialsProvider)).withClientConfiguration(new ClientConfiguration())).withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, AwsHostNameUtils.parseRegion((String)endpoint, null)))).build();
    }

    @VisibleForTesting
    public void start() {
        this.checkIfClosed();
        if (this.backgroundFetchEnabled && this.partitionsFetchStarted.compareAndSet(false, true)) {
            this.partitionResources.values().forEach(rec$ -> ((PartitionResource)rec$).startBackgroundFetch());
        }
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.assign((Set<StreamPartition<String>>)ImmutableSet.of());
        if (this.scheduledExec != null) {
            this.scheduledExec.shutdown();
            try {
                if (!this.scheduledExec.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                    this.scheduledExec.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                log.warn((Throwable)e, "InterruptedException while shutting down", new Object[0]);
                throw new RuntimeException(e);
            }
        }
        this.closed = true;
    }

    public void assign(Set<StreamPartition<String>> collection) {
        this.checkIfClosed();
        collection.forEach(streamPartition -> this.partitionResources.putIfAbsent((StreamPartition<String>)streamPartition, new PartitionResource((StreamPartition)streamPartition)));
        Iterator i = this.partitionResources.entrySet().iterator();
        while (i.hasNext()) {
            Map.Entry entry = i.next();
            if (collection.contains(entry.getKey())) continue;
            i.remove();
            ((PartitionResource)entry.getValue()).stopBackgroundFetch();
        }
    }

    public Collection<StreamPartition<String>> getAssignment() {
        return this.partitionResources.keySet();
    }

    public void seek(StreamPartition<String> partition, String sequenceNumber) throws InterruptedException {
        this.filterBufferAndResetBackgroundFetch((Set<StreamPartition<String>>)ImmutableSet.of(partition));
        this.partitionSeek(partition, sequenceNumber, ShardIteratorType.AT_SEQUENCE_NUMBER);
    }

    public void seekToEarliest(Set<StreamPartition<String>> partitions) throws InterruptedException {
        this.filterBufferAndResetBackgroundFetch(partitions);
        partitions.forEach(partition -> this.partitionSeek((StreamPartition<String>)partition, null, ShardIteratorType.TRIM_HORIZON));
    }

    public void seekToLatest(Set<StreamPartition<String>> partitions) throws InterruptedException {
        this.filterBufferAndResetBackgroundFetch(partitions);
        partitions.forEach(partition -> this.partitionSeek((StreamPartition<String>)partition, null, ShardIteratorType.LATEST));
    }

    @Nullable
    public String getPosition(StreamPartition<String> partition) {
        throw new UnsupportedOperationException("getPosition() is not supported in Kinesis");
    }

    @Nonnull
    public List<OrderedPartitionableRecord<String, String>> poll(long timeout) {
        this.start();
        try {
            int expectedSize = Math.min(Math.max(this.records.size(), 1), this.maxRecordsPerPoll);
            ArrayList<OrderedPartitionableRecord<String, String>> polledRecords = new ArrayList(expectedSize);
            Queues.drain(this.records, polledRecords, (int)expectedSize, (long)timeout, (TimeUnit)TimeUnit.MILLISECONDS);
            polledRecords = polledRecords.stream().filter(x -> this.partitionResources.containsKey(x.getStreamPartition())).collect(Collectors.toList());
            return polledRecords;
        }
        catch (InterruptedException e) {
            log.warn((Throwable)e, "Interrupted while polling", new Object[0]);
            return Collections.emptyList();
        }
    }

    @Nullable
    public String getLatestSequenceNumber(StreamPartition<String> partition) {
        return this.getSequenceNumber(partition, ShardIteratorType.LATEST);
    }

    @Nullable
    public String getEarliestSequenceNumber(StreamPartition<String> partition) {
        return this.getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
    }

    public Set<String> getPartitionIds(String stream) {
        return KinesisRecordSupplier.wrapExceptions(() -> {
            HashSet<String> retVal = new HashSet<String>();
            DescribeStreamRequest request = new DescribeStreamRequest();
            request.setStreamName(stream);
            while (request != null) {
                DescribeStreamResult result = this.kinesis.describeStream(request);
                StreamDescription streamDescription = result.getStreamDescription();
                List shards = streamDescription.getShards();
                for (Shard shard : shards) {
                    retVal.add(shard.getShardId());
                }
                if (streamDescription.isHasMoreShards().booleanValue()) {
                    request.setExclusiveStartShardId(((Shard)Iterables.getLast((Iterable)shards)).getShardId());
                    continue;
                }
                request = null;
            }
            return retVal;
        });
    }

    public Map<String, Long> getPartitionsTimeLag(String stream, Map<String, String> currentOffsets) {
        HashMap partitionLag = Maps.newHashMapWithExpectedSize((int)currentOffsets.size());
        for (Map.Entry<String, String> partitionOffset : currentOffsets.entrySet()) {
            if (!KinesisSequenceNumber.isValidAWSKinesisSequence(partitionOffset.getValue())) continue;
            StreamPartition partition = new StreamPartition(stream, (Object)partitionOffset.getKey());
            long currentLag = this.getPartitionTimeLag((StreamPartition<String>)partition, partitionOffset.getValue());
            partitionLag.put(partitionOffset.getKey(), currentLag);
        }
        return partitionLag;
    }

    @VisibleForTesting
    Map<String, Long> getPartitionResourcesTimeLag() {
        return this.partitionResources.entrySet().stream().collect(Collectors.toMap(k -> (String)((StreamPartition)k.getKey()).getPartitionId(), k -> ((PartitionResource)k.getValue()).getPartitionTimeLag()));
    }

    @VisibleForTesting
    public int bufferSize() {
        return this.records.size();
    }

    @VisibleForTesting
    public boolean isBackgroundFetchRunning() {
        return this.partitionsFetchStarted.get();
    }

    private void partitionSeek(StreamPartition<String> partition, String sequenceNumber, ShardIteratorType iteratorEnum) {
        PartitionResource resource = (PartitionResource)this.partitionResources.get(partition);
        if (resource == null) {
            throw new ISE("Partition [%s] has not been assigned", new Object[]{partition});
        }
        resource.seek(iteratorEnum, sequenceNumber);
    }

    @Nullable
    private String getSequenceNumber(StreamPartition<String> partition, ShardIteratorType iteratorEnum) {
        return KinesisRecordSupplier.wrapExceptions(() -> {
            String shardIterator = this.kinesis.getShardIterator(partition.getStream(), (String)partition.getPartitionId(), iteratorEnum.toString()).getShardIterator();
            long timeoutMillis = System.currentTimeMillis() + (long)this.fetchSequenceNumberTimeout;
            GetRecordsResult recordsResult = null;
            while (shardIterator != null && System.currentTimeMillis() < timeoutMillis) {
                if (this.closed) {
                    log.info("KinesisRecordSupplier closed while fetching sequenceNumber", new Object[0]);
                    return null;
                }
                String currentShardIterator = shardIterator;
                GetRecordsRequest request = new GetRecordsRequest().withShardIterator(currentShardIterator).withLimit(Integer.valueOf(1000));
                recordsResult = (GetRecordsResult)RetryUtils.retry(() -> this.kinesis.getRecords(request), throwable -> {
                    if (throwable instanceof ProvisionedThroughputExceededException) {
                        log.warn(throwable, "encountered ProvisionedThroughputExceededException while fetching records, this means that the request rate for the stream is too high, or the requested data is too large for the available throughput. Reduce the frequency or size of your requests. Consider increasing the number of shards to increase throughput.", new Object[0]);
                        return true;
                    }
                    if (throwable instanceof AmazonServiceException) {
                        AmazonServiceException ase = (AmazonServiceException)((Object)((Object)throwable));
                        return KinesisRecordSupplier.isServiceExceptionRecoverable(ase);
                    }
                    return false;
                }, (int)10);
                List records = recordsResult.getRecords();
                if (!records.isEmpty()) {
                    return ((Record)records.get(0)).getSequenceNumber();
                }
                shardIterator = recordsResult.getNextShardIterator();
            }
            if (shardIterator == null) {
                log.info("Partition[%s] returned a null shard iterator, is the shard closed?", new Object[]{partition.getPartitionId()});
                return "EOS";
            }
            log.warn("timed out while trying to fetch position for shard[%s], millisBehindLatest is [%s], likely no more records in shard", new Object[]{partition.getPartitionId(), recordsResult != null ? recordsResult.getMillisBehindLatest() : "UNKNOWN"});
            return null;
        });
    }

    private Long getPartitionTimeLag(StreamPartition<String> partition, String offset) {
        return KinesisRecordSupplier.wrapExceptions(() -> {
            String offsetToUse;
            String iteratorType;
            if (offset == null || "-1".equals(offset)) {
                if (!this.useEarliestSequenceNumber) return 0L;
                iteratorType = ShardIteratorType.TRIM_HORIZON.toString();
                offsetToUse = null;
            } else {
                iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
                offsetToUse = offset;
            }
            String shardIterator = this.kinesis.getShardIterator(partition.getStream(), (String)partition.getPartitionId(), iteratorType, offsetToUse).getShardIterator();
            GetRecordsResult recordsResult = this.kinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(Integer.valueOf(1)));
            return recordsResult.getMillisBehindLatest();
        });
    }

    private void checkIfClosed() {
        if (this.closed) {
            throw new ISE("Invalid operation - KinesisRecordSupplier has already been closed", new Object[0]);
        }
    }

    private void filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> partitions) throws InterruptedException {
        this.checkIfClosed();
        if (this.backgroundFetchEnabled && this.partitionsFetchStarted.compareAndSet(true, false)) {
            this.scheduledExec.shutdown();
            try {
                if (!this.scheduledExec.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                    this.scheduledExec.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                log.warn((Throwable)e, "InterruptedException while shutting down", new Object[0]);
                throw e;
            }
            this.scheduledExec = Executors.newScheduledThreadPool(this.fetchThreads, Execs.makeThreadFactory((String)"KinesisRecordSupplier-Worker-%d"));
        }
        LinkedBlockingQueue<OrderedPartitionableRecord<String, String>> newQ = new LinkedBlockingQueue<OrderedPartitionableRecord<String, String>>(this.recordBufferSize);
        this.records.stream().filter(x -> !partitions.contains(x.getStreamPartition())).forEachOrdered(newQ::offer);
        this.records = newQ;
        this.partitionResources.values().forEach(x -> ((PartitionResource)x).stopBackgroundFetch());
    }

    private class PartitionResource {
        private final StreamPartition<String> streamPartition;
        @Nullable
        private volatile String shardIterator;
        private volatile long currentLagMillis;
        private final AtomicBoolean fetchStarted = new AtomicBoolean();
        private ScheduledFuture<?> currentFetch;

        private PartitionResource(StreamPartition<String> streamPartition) {
            this.streamPartition = streamPartition;
        }

        private void startBackgroundFetch() {
            if (!KinesisRecordSupplier.this.backgroundFetchEnabled) {
                return;
            }
            if (this.shardIterator == null) {
                log.warn("Skipping background fetch for stream[%s] partition[%s] since seek has not been called for this partition", new Object[]{this.streamPartition.getStream(), this.streamPartition.getPartitionId()});
                return;
            }
            if (this.fetchStarted.compareAndSet(false, true)) {
                log.debug("Starting scheduled fetch for stream[%s] partition[%s]", new Object[]{this.streamPartition.getStream(), this.streamPartition.getPartitionId()});
                this.scheduleBackgroundFetch(KinesisRecordSupplier.this.fetchDelayMillis);
            }
        }

        private void stopBackgroundFetch() {
            if (this.fetchStarted.compareAndSet(true, false)) {
                log.debug("Stopping scheduled fetch for stream[%s] partition[%s]", new Object[]{this.streamPartition.getStream(), this.streamPartition.getPartitionId()});
                if (this.currentFetch != null && !this.currentFetch.isDone()) {
                    this.currentFetch.cancel(true);
                }
            }
        }

        private void scheduleBackgroundFetch(long delayMillis) {
            if (this.fetchStarted.get()) {
                try {
                    this.currentFetch = KinesisRecordSupplier.this.scheduledExec.schedule(this.fetchRecords(), delayMillis, TimeUnit.MILLISECONDS);
                }
                catch (RejectedExecutionException e) {
                    log.warn((Throwable)e, "Caught RejectedExecutionException, KinesisRecordSupplier for partition[%s] has likely temporarily shutdown the ExecutorService. This is expected behavior after calling seek(), seekToEarliest() and seekToLatest()", new Object[]{this.streamPartition.getPartitionId()});
                }
            } else {
                log.debug("Worker for partition[%s] is already stopped", new Object[]{this.streamPartition.getPartitionId()});
            }
        }

        private Runnable fetchRecords() {
            return () -> {
                if (!this.fetchStarted.get()) {
                    log.debug("Worker for partition[%s] has been stopped", new Object[]{this.streamPartition.getPartitionId()});
                    return;
                }
                GetRecordsResult recordsResult = null;
                try {
                    if (this.shardIterator == null) {
                        log.info("shardIterator[%s] has been closed and has no more records", new Object[]{this.streamPartition.getPartitionId()});
                        OrderedPartitionableRecord currRecord = new OrderedPartitionableRecord(this.streamPartition.getStream(), this.streamPartition.getPartitionId(), (Object)"EOS", null);
                        recordsResult = null;
                        if (!KinesisRecordSupplier.this.records.offer(currRecord, KinesisRecordSupplier.this.recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {
                            log.warn("OrderedPartitionableRecord buffer full, retrying in [%,dms]", new Object[]{KinesisRecordSupplier.this.recordBufferFullWait});
                            this.scheduleBackgroundFetch(KinesisRecordSupplier.this.recordBufferFullWait);
                        }
                        return;
                    }
                    recordsResult = KinesisRecordSupplier.this.kinesis.getRecords(new GetRecordsRequest().withShardIterator(this.shardIterator).withLimit(Integer.valueOf(KinesisRecordSupplier.this.recordsPerFetch)));
                    this.currentLagMillis = recordsResult.getMillisBehindLatest();
                    for (Record kinesisRecord : recordsResult.getRecords()) {
                        List<byte[]> data;
                        if (KinesisRecordSupplier.this.deaggregate) {
                            if (KinesisRecordSupplier.this.deaggregateHandle == null || KinesisRecordSupplier.this.getDataHandle == null) {
                                throw new ISE("deaggregateHandle or getDataHandle is null!", new Object[0]);
                            }
                            data = new ArrayList<byte[]>();
                            List userRecords = KinesisRecordSupplier.this.deaggregateHandle.invokeExact(Collections.singletonList(kinesisRecord));
                            for (Object userRecord : userRecords) {
                                data.add(KinesisRecordSupplier.toByteArray(KinesisRecordSupplier.this.getDataHandle.invoke(userRecord)));
                            }
                        } else {
                            data = Collections.singletonList(KinesisRecordSupplier.toByteArray(kinesisRecord.getData()));
                        }
                        OrderedPartitionableRecord currRecord = new OrderedPartitionableRecord(this.streamPartition.getStream(), this.streamPartition.getPartitionId(), (Object)kinesisRecord.getSequenceNumber(), data);
                        log.trace("Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s", new Object[]{currRecord.getStream(), currRecord.getPartitionId(), currRecord.getSequenceNumber(), KinesisRecordSupplier.this.records.remainingCapacity(), currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())});
                        if (KinesisRecordSupplier.this.records.offer(currRecord, KinesisRecordSupplier.this.recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) continue;
                        log.warn("OrderedPartitionableRecord buffer full, storing iterator and retrying in [%,dms]", new Object[]{KinesisRecordSupplier.this.recordBufferFullWait});
                        this.shardIterator = KinesisRecordSupplier.this.kinesis.getShardIterator(currRecord.getStream(), (String)currRecord.getPartitionId(), ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), (String)currRecord.getSequenceNumber()).getShardIterator();
                        this.scheduleBackgroundFetch(KinesisRecordSupplier.this.recordBufferFullWait);
                        return;
                    }
                    this.shardIterator = recordsResult.getNextShardIterator();
                    this.scheduleBackgroundFetch(KinesisRecordSupplier.this.fetchDelayMillis);
                }
                catch (ProvisionedThroughputExceededException e) {
                    log.warn((Throwable)e, "encounted ProvisionedThroughputExceededException while fetching records, this means that the request rate for the stream is too high, or the requested data is too large for the available throughput. Reduce the frequency or size of your requests.", new Object[0]);
                    long retryMs = Math.max(3000L, (long)KinesisRecordSupplier.this.fetchDelayMillis);
                    this.scheduleBackgroundFetch(retryMs);
                }
                catch (InterruptedException e) {
                    log.warn((Throwable)e, "Interrupted while waiting to add record to buffer, retrying in [%,dms]", new Object[]{10000L});
                    this.scheduleBackgroundFetch(10000L);
                }
                catch (ExpiredIteratorException e) {
                    log.warn((Throwable)e, "ShardIterator expired while trying to fetch records, retrying in [%,dms]", new Object[]{KinesisRecordSupplier.this.fetchDelayMillis});
                    if (recordsResult != null) {
                        this.shardIterator = recordsResult.getNextShardIterator();
                        this.scheduleBackgroundFetch(KinesisRecordSupplier.this.fetchDelayMillis);
                    }
                    throw new ISE("can't reschedule fetch records runnable, recordsResult is null??", new Object[0]);
                }
                catch (InvalidArgumentException | ResourceNotFoundException e) {
                    log.error(e, "encounted AWS error while attempting to fetch records, will not retry", new Object[0]);
                    throw e;
                }
                catch (AmazonServiceException e) {
                    if (KinesisRecordSupplier.isServiceExceptionRecoverable(e)) {
                        log.warn((Throwable)e, "encounted unknown recoverable AWS exception, retrying in [%,dms]", new Object[]{10000L});
                        this.scheduleBackgroundFetch(10000L);
                    }
                    log.warn((Throwable)e, "encounted unknown unrecoverable AWS exception, will not retry", new Object[0]);
                    throw new RuntimeException(e);
                }
                catch (Throwable e) {
                    log.error(e, "unknown fetchRecords exception, will not retry", new Object[0]);
                    throw new RuntimeException(e);
                }
            };
        }

        private void seek(ShardIteratorType iteratorEnum, String sequenceNumber) {
            log.debug("Seeking partition [%s] to [%s]", new Object[]{this.streamPartition.getPartitionId(), sequenceNumber != null ? sequenceNumber : iteratorEnum.toString()});
            this.shardIterator = (String)KinesisRecordSupplier.wrapExceptions(() -> KinesisRecordSupplier.this.kinesis.getShardIterator(this.streamPartition.getStream(), (String)this.streamPartition.getPartitionId(), iteratorEnum.toString(), sequenceNumber).getShardIterator());
        }

        private long getPartitionTimeLag() {
            return this.currentLagMillis;
        }
    }
}

