/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.kinesis.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
import org.apache.druid.indexing.kinesis.KinesisRecordSupplier;
import org.apache.druid.indexing.kinesis.KinesisSequenceNumber;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorReportPayload;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.joda.time.DateTime;

public class KinesisSupervisor
extends SeekableStreamSupervisor<String, String> {
    private static final EmittingLogger log = new EmittingLogger(KinesisSupervisor.class);
    public static final TypeReference<TreeMap<Integer, Map<String, String>>> CHECKPOINTS_TYPE_REF = new TypeReference<TreeMap<Integer, Map<String, String>>>(){};
    public static final String OFFSET_NOT_SET = "-1";
    private final KinesisSupervisorSpec spec;
    private final AWSCredentialsConfig awsCredentialsConfig;
    private volatile Map<String, Long> currentPartitionTimeLag;

    public KinesisSupervisor(TaskStorage taskStorage, TaskMaster taskMaster, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, KinesisIndexTaskClientFactory taskClientFactory, ObjectMapper mapper, KinesisSupervisorSpec spec, RowIngestionMetersFactory rowIngestionMetersFactory, AWSCredentialsConfig awsCredentialsConfig) {
        super(StringUtils.format((String)"KinesisSupervisor-%s", (Object[])new Object[]{spec.getDataSchema().getDataSource()}), taskStorage, taskMaster, indexerMetadataStorageCoordinator, (SeekableStreamIndexTaskClientFactory)taskClientFactory, mapper, (SeekableStreamSupervisorSpec)spec, rowIngestionMetersFactory, true);
        this.spec = spec;
        this.awsCredentialsConfig = awsCredentialsConfig;
        this.currentPartitionTimeLag = null;
    }

    protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(int groupId, Map<String, String> startPartitions, Map<String, String> endPartitions, String baseSequenceName, DateTime minimumMessageTime, DateTime maximumMessageTime, Set<String> exclusiveStartSequenceNumberPartitions, SeekableStreamSupervisorIOConfig ioConfigg) {
        KinesisSupervisorIOConfig ioConfig = (KinesisSupervisorIOConfig)ioConfigg;
        return new KinesisIndexTaskIOConfig(groupId, baseSequenceName, (SeekableStreamStartSequenceNumbers<String, String>)new SeekableStreamStartSequenceNumbers(ioConfig.getStream(), startPartitions, exclusiveStartSequenceNumberPartitions), (SeekableStreamEndSequenceNumbers<String, String>)new SeekableStreamEndSequenceNumbers(ioConfig.getStream(), endPartitions), true, minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat(), ioConfig.getEndpoint(), ioConfig.getRecordsPerFetch(), ioConfig.getFetchDelayMillis(), ioConfig.getAwsAssumedRoleArn(), ioConfig.getAwsExternalId(), ioConfig.isDeaggregate());
    }

    protected List<SeekableStreamIndexTask<String, String>> createIndexTasks(int replicas, String baseSequenceName, ObjectMapper sortingMapper, TreeMap<Integer, Map<String, String>> sequenceOffsets, SeekableStreamIndexTaskIOConfig taskIoConfig, SeekableStreamIndexTaskTuningConfig taskTuningConfig, RowIngestionMetersFactory rowIngestionMetersFactory) throws JsonProcessingException {
        String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
        Map context = this.createBaseTaskContexts();
        context.put("checkpoints", checkpoints);
        ArrayList<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<SeekableStreamIndexTask<String, String>>();
        for (int i = 0; i < replicas; ++i) {
            String taskId = IdUtils.getRandomIdWithPrefix((String)baseSequenceName);
            taskList.add(new KinesisIndexTask(taskId, new TaskResource(baseSequenceName, 1), this.spec.getDataSchema(), (KinesisIndexTaskTuningConfig)taskTuningConfig, (KinesisIndexTaskIOConfig)taskIoConfig, context, this.awsCredentialsConfig));
        }
        return taskList;
    }

    protected RecordSupplier<String, String> setupRecordSupplier() throws RuntimeException {
        KinesisSupervisorIOConfig ioConfig = this.spec.getIoConfig();
        KinesisSupervisorTuningConfig taskTuningConfig = this.spec.getTuningConfig();
        return new KinesisRecordSupplier(KinesisRecordSupplier.getAmazonKinesisClient(ioConfig.getEndpoint(), this.awsCredentialsConfig, ioConfig.getAwsAssumedRoleArn(), ioConfig.getAwsExternalId()), ioConfig.getRecordsPerFetch(), ioConfig.getFetchDelayMillis(), 0, ioConfig.isDeaggregate(), taskTuningConfig.getRecordBufferSize(), taskTuningConfig.getRecordBufferOfferTimeout(), taskTuningConfig.getRecordBufferFullWait(), taskTuningConfig.getFetchSequenceNumberTimeout(), taskTuningConfig.getMaxRecordsPerPoll(), ioConfig.isUseEarliestSequenceNumber());
    }

    protected int getTaskGroupIdForPartition(String partitionId) {
        return this.getTaskGroupIdForPartitionWithProvidedList(partitionId, this.partitionIds);
    }

    private int getTaskGroupIdForPartitionWithProvidedList(String partitionId, List<String> availablePartitions) {
        int index = availablePartitions.indexOf(partitionId);
        if (index < 0) {
            return index;
        }
        return availablePartitions.indexOf(partitionId) % this.spec.getIoConfig().getTaskCount();
    }

    protected Map<Integer, Set<String>> recomputePartitionGroupsForExpiration(Set<String> availablePartitions) {
        ArrayList<String> availablePartitionsList = new ArrayList<String>(availablePartitions);
        HashMap<Integer, Set<String>> newPartitionGroups = new HashMap<Integer, Set<String>>();
        for (String availablePartition : availablePartitions) {
            int newTaskGroupId = this.getTaskGroupIdForPartitionWithProvidedList(availablePartition, availablePartitionsList);
            Set newGroup = newPartitionGroups.computeIfAbsent(newTaskGroupId, k -> new HashSet());
            newGroup.add(availablePartition);
        }
        return newPartitionGroups;
    }

    protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata) {
        return metadata instanceof KinesisDataSourceMetadata;
    }

    protected boolean doesTaskTypeMatchSupervisor(Task task) {
        return task instanceof KinesisIndexTask;
    }

    protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(int numPartitions, boolean includeOffsets) {
        KinesisSupervisorIOConfig ioConfig = this.spec.getIoConfig();
        Map<String, Long> partitionLag = this.getTimeLagPerPartition(this.getHighestCurrentOffsets());
        return new KinesisSupervisorReportPayload(this.spec.getDataSchema().getDataSource(), ioConfig.getStream(), numPartitions, ioConfig.getReplicas(), ioConfig.getTaskDuration().getMillis() / 1000L, this.spec.isSuspended(), this.stateManager.isHealthy(), this.stateManager.getSupervisorState().getBasicState(), this.stateManager.getSupervisorState(), this.stateManager.getExceptionEvents(), includeOffsets ? partitionLag : null, includeOffsets ? Long.valueOf(partitionLag.values().stream().mapToLong(x -> Math.max(x, 0L)).sum()) : null);
    }

    protected Map<String, Long> getRecordLagPerPartition(Map<String, String> currentOffsets) {
        return ImmutableMap.of();
    }

    protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOffsets) {
        return currentOffsets.entrySet().stream().filter(e -> e.getValue() != null && this.currentPartitionTimeLag != null && this.currentPartitionTimeLag.get(e.getKey()) != null).collect(Collectors.toMap(Map.Entry::getKey, e -> this.currentPartitionTimeLag.get(e.getKey())));
    }

    protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(String stream, Map<String, String> map) {
        return new KinesisDataSourceMetadata((SeekableStreamSequenceNumbers<String, String>)new SeekableStreamEndSequenceNumbers(stream, map));
    }

    protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean isExclusive) {
        return KinesisSequenceNumber.of(seq, isExclusive);
    }

    protected void updatePartitionLagFromStream() {
        KinesisRecordSupplier supplier = (KinesisRecordSupplier)this.recordSupplier;
        this.currentPartitionTimeLag = supplier.getPartitionsTimeLag(this.getIoConfig().getStream(), this.getHighestCurrentOffsets());
    }

    protected Map<String, Long> getPartitionRecordLag() {
        return null;
    }

    protected Map<String, Long> getPartitionTimeLag() {
        return this.currentPartitionTimeLag;
    }

    protected String baseTaskName() {
        return "index_kinesis";
    }

    protected String getNotSetMarker() {
        return OFFSET_NOT_SET;
    }

    protected String getEndOfPartitionMarker() {
        return "NO_END_SEQUENCE_NUMBER";
    }

    protected boolean isEndOfShard(String seqNum) {
        return "EOS".equals(seqNum);
    }

    protected boolean isShardExpirationMarker(String seqNum) {
        return "EXPIRED".equals(seqNum);
    }

    protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() {
        return true;
    }

    protected Map<String, OrderedSequenceNumber<String>> filterExpiredPartitionsFromStartingOffsets(Map<String, OrderedSequenceNumber<String>> startingOffsets) {
        HashMap<String, OrderedSequenceNumber<String>> filteredOffsets = new HashMap<String, OrderedSequenceNumber<String>>();
        for (Map.Entry<String, OrderedSequenceNumber<String>> entry : startingOffsets.entrySet()) {
            if (!((String)entry.getValue().get()).equals("EOS")) {
                filteredOffsets.put(entry.getKey(), entry.getValue());
                continue;
            }
            log.debug("Excluding shard[%s] because it has reached EOS.", new Object[]{entry.getKey()});
        }
        return filteredOffsets;
    }

    protected boolean supportsPartitionExpiration() {
        return true;
    }

    protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithExpiredPartitions(SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> expiredPartitionIds) {
        log.info("Marking expired shards in metadata: " + expiredPartitionIds, new Object[0]);
        return this.createDataSourceMetadataWithClosedOrExpiredPartitions(currentMetadata, expiredPartitionIds, "EXPIRED");
    }

    private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions(SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> terminatedPartitionIds, String terminationMarker) {
        SeekableStreamEndSequenceNumbers newSequences;
        KinesisDataSourceMetadata dataSourceMetadata = (KinesisDataSourceMetadata)currentMetadata;
        SeekableStreamSequenceNumbers old = dataSourceMetadata.getSeekableStreamSequenceNumbers();
        Map oldPartitionSequenceNumberMap = old.getPartitionSequenceNumberMap();
        HashMap newPartitionSequenceNumberMap = new HashMap();
        for (Map.Entry entry : oldPartitionSequenceNumberMap.entrySet()) {
            if (!terminatedPartitionIds.contains(entry.getKey())) {
                newPartitionSequenceNumberMap.put(entry.getKey(), entry.getValue());
                continue;
            }
            newPartitionSequenceNumberMap.put(entry.getKey(), terminationMarker);
        }
        if (old instanceof SeekableStreamStartSequenceNumbers) {
            HashSet<String> newExclusiveStartPartitions = new HashSet<String>();
            Set oldExclusiveStartPartitions = ((SeekableStreamStartSequenceNumbers)old).getExclusivePartitions();
            for (String partitionId : oldExclusiveStartPartitions) {
                if (terminatedPartitionIds.contains(partitionId)) continue;
                newExclusiveStartPartitions.add(partitionId);
            }
            newSequences = new SeekableStreamStartSequenceNumbers(old.getStream(), null, newPartitionSequenceNumberMap, null, newExclusiveStartPartitions);
        } else {
            newSequences = new SeekableStreamEndSequenceNumbers(old.getStream(), null, newPartitionSequenceNumberMap, null);
        }
        return new KinesisDataSourceMetadata((SeekableStreamSequenceNumbers<String, String>)newSequences);
    }
}

