package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.EncryptionType;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StartingPosition;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.class */
public class FanOutRecordPublisher implements RecordPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(FanOutRecordPublisher.class);
    private final FullJitterBackoff backoff;
    private final String consumerArn;
    private final KinesisProxyV2Interface kinesisProxy;
    private final StreamShardHandle subscribedShard;
    private final FanOutRecordPublisherConfiguration configuration;
    private int attempt = 0;
    private StartingPosition nextStartingPosition;

    public FanOutRecordPublisher(StartingPosition startingPosition, String str, StreamShardHandle streamShardHandle, KinesisProxyV2Interface kinesisProxyV2Interface, FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration, FullJitterBackoff fullJitterBackoff) {
        this.nextStartingPosition = (StartingPosition) Preconditions.checkNotNull(startingPosition);
        this.consumerArn = (String) Preconditions.checkNotNull(str);
        this.subscribedShard = (StreamShardHandle) Preconditions.checkNotNull(streamShardHandle);
        this.kinesisProxy = (KinesisProxyV2Interface) Preconditions.checkNotNull(kinesisProxyV2Interface);
        this.configuration = (FanOutRecordPublisherConfiguration) Preconditions.checkNotNull(fanOutRecordPublisherConfiguration);
        this.backoff = (FullJitterBackoff) Preconditions.checkNotNull(fullJitterBackoff);
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher
    public RecordPublisher.RecordPublisherRunResult run(RecordPublisher.RecordBatchConsumer recordBatchConsumer) throws InterruptedException {
        LOG.info("Running fan out record publisher on {}::{} from {} - {}", new Object[]{this.subscribedShard.getStreamName(), this.subscribedShard.getShard().getShardId(), this.nextStartingPosition.getShardIteratorType(), this.nextStartingPosition.getStartingMarker()});
        RecordPublisher.RecordPublisherRunResult runWithBackoff = runWithBackoff(subscribeToShardEvent -> {
            this.nextStartingPosition = StartingPosition.continueFromSequenceNumber(recordBatchConsumer.accept(new RecordBatch(toSdkV1Records(subscribeToShardEvent.records()), this.subscribedShard, subscribeToShardEvent.millisBehindLatest())));
        });
        LOG.info("Subscription expired {}::{}, with status {}", new Object[]{this.subscribedShard.getStreamName(), this.subscribedShard.getShard().getShardId(), runWithBackoff});
        return runWithBackoff;
    }

    private RecordPublisher.RecordPublisherRunResult runWithBackoff(Consumer<SubscribeToShardEvent> consumer) throws InterruptedException {
        try {
            boolean subscribeToShardAndConsumeRecords = new FanOutShardSubscriber(this.consumerArn, this.subscribedShard.getShard().getShardId(), this.kinesisProxy, this.configuration.getSubscribeToShardTimeout()).subscribeToShardAndConsumeRecords(toSdkV2StartingPosition(this.nextStartingPosition), consumer);
            this.attempt = 0;
            return subscribeToShardAndConsumeRecords ? RecordPublisher.RecordPublisherRunResult.COMPLETE : RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
        } catch (FanOutShardSubscriber.FanOutSubscriberInterruptedException e) {
            LOG.info("Thread interrupted, closing record publisher for shard {}.", this.subscribedShard.getShard().getShardId(), e);
            return RecordPublisher.RecordPublisherRunResult.CANCELLED;
        } catch (FanOutShardSubscriber.RecoverableFanOutSubscriberException e2) {
            backoff(e2);
            return RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
        } catch (FanOutShardSubscriber.FanOutSubscriberException e3) {
            if (e3.getCause() instanceof ResourceNotFoundException) {
                LOG.warn("Received ResourceNotFoundException. Either the shard does not exist, or the stream subscriber has been deregistered.Marking this shard as complete {} ({})", this.subscribedShard.getShard().getShardId(), this.consumerArn);
                return RecordPublisher.RecordPublisherRunResult.COMPLETE;
            }
            if (this.attempt == this.configuration.getSubscribeToShardMaxRetries()) {
                String str = "Maximum retries exceeded for SubscribeToShard. Failed " + this.configuration.getSubscribeToShardMaxRetries() + " times.";
                LOG.error(str, e3.getCause());
                throw new RuntimeException(str, e3.getCause());
            }
            this.attempt++;
            backoff(e3);
            return RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
        }
    }

    private void backoff(Throwable th) throws InterruptedException {
        long calculateFullJitterBackoff = this.backoff.calculateFullJitterBackoff(this.configuration.getSubscribeToShardBaseBackoffMillis(), this.configuration.getSubscribeToShardMaxBackoffMillis(), this.configuration.getSubscribeToShardExpConstant(), this.attempt);
        LOG.warn("Encountered recoverable error {}. Backing off for {} millis {} ({})", new Object[]{th.getCause().getClass().getSimpleName(), Long.valueOf(calculateFullJitterBackoff), this.subscribedShard.getShard().getShardId(), this.consumerArn, th});
        this.backoff.sleep(calculateFullJitterBackoff);
    }

    private List<Record> toSdkV1Records(List<org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(toSdkV1Record(it.next()));
        }
        return arrayList;
    }

    private Record toSdkV1Record(@Nonnull org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record record) {
        Record withApproximateArrivalTimestamp = new Record().withData(record.data().asByteBuffer()).withSequenceNumber(record.sequenceNumber()).withPartitionKey(record.partitionKey()).withApproximateArrivalTimestamp(new Date(record.approximateArrivalTimestamp().toEpochMilli()));
        EncryptionType encryptionType = record.encryptionType();
        if (encryptionType != null) {
            withApproximateArrivalTimestamp.withEncryptionType(encryptionType.name());
        }
        return withApproximateArrivalTimestamp;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StartingPosition toSdkV2StartingPosition(StartingPosition startingPosition) {
        StartingPosition.Builder type = org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().type(startingPosition.getShardIteratorType().toString());
        Object startingMarker = startingPosition.getStartingMarker();
        switch (startingPosition.getShardIteratorType()) {
            case AT_TIMESTAMP:
                Preconditions.checkNotNull(startingMarker, "StartingPosition AT_TIMESTAMP date marker is null.");
                type.timestamp(((Date) startingMarker).toInstant());
                break;
            case AT_SEQUENCE_NUMBER:
            case AFTER_SEQUENCE_NUMBER:
                Preconditions.checkNotNull(startingMarker, "StartingPosition *_SEQUENCE_NUMBER position is null.");
                type.sequenceNumber(startingMarker.toString());
                break;
        }
        return (org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StartingPosition) type.mo2702build();
    }
}
