/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.druid.data.input.Committer;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.timeline.DataSegment;

public class SequenceMetadata<PartitionIdType, SequenceOffsetType> {
    private static final EmittingLogger log = new EmittingLogger(SequenceMetadata.class);
    private final int sequenceId;
    private final String sequenceName;
    private final Set<PartitionIdType> exclusiveStartPartitions;
    private final Set<PartitionIdType> assignments;
    private final boolean sentinel;
    private final ReentrantLock lock = new ReentrantLock();
    final Map<PartitionIdType, SequenceOffsetType> startOffsets;
    final Map<PartitionIdType, SequenceOffsetType> endOffsets;
    private boolean checkpointed;

    @JsonCreator
    public SequenceMetadata(@JsonProperty(value="sequenceId") int sequenceId, @JsonProperty(value="sequenceName") String sequenceName, @JsonProperty(value="startOffsets") Map<PartitionIdType, SequenceOffsetType> startOffsets, @JsonProperty(value="endOffsets") Map<PartitionIdType, SequenceOffsetType> endOffsets, @JsonProperty(value="checkpointed") boolean checkpointed, @JsonProperty(value="exclusiveStartPartitions") Set<PartitionIdType> exclusiveStartPartitions) {
        Preconditions.checkNotNull((Object)sequenceName);
        Preconditions.checkNotNull(startOffsets);
        Preconditions.checkNotNull(endOffsets);
        this.sequenceId = sequenceId;
        this.sequenceName = sequenceName;
        this.startOffsets = ImmutableMap.copyOf(startOffsets);
        this.endOffsets = new HashMap<PartitionIdType, SequenceOffsetType>(endOffsets);
        this.assignments = new HashSet<PartitionIdType>(startOffsets.keySet());
        this.checkpointed = checkpointed;
        this.sentinel = false;
        this.exclusiveStartPartitions = exclusiveStartPartitions == null ? Collections.emptySet() : exclusiveStartPartitions;
    }

    @JsonProperty
    public Set<PartitionIdType> getExclusiveStartPartitions() {
        return this.exclusiveStartPartitions;
    }

    @JsonProperty
    public int getSequenceId() {
        return this.sequenceId;
    }

    @JsonProperty
    public boolean isCheckpointed() {
        this.lock.lock();
        try {
            boolean bl = this.checkpointed;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    @JsonProperty
    public String getSequenceName() {
        return this.sequenceName;
    }

    @JsonProperty
    public Map<PartitionIdType, SequenceOffsetType> getStartOffsets() {
        return this.startOffsets;
    }

    @JsonProperty
    public Map<PartitionIdType, SequenceOffsetType> getEndOffsets() {
        this.lock.lock();
        try {
            Map<PartitionIdType, SequenceOffsetType> map = this.endOffsets;
            return map;
        }
        finally {
            this.lock.unlock();
        }
    }

    @JsonProperty
    public boolean isSentinel() {
        return this.sentinel;
    }

    void setEndOffsets(Map<PartitionIdType, SequenceOffsetType> newEndOffsets) {
        this.lock.lock();
        try {
            this.endOffsets.putAll(newEndOffsets);
            this.checkpointed = true;
        }
        finally {
            this.lock.unlock();
        }
    }

    void updateAssignments(Map<PartitionIdType, SequenceOffsetType> currOffsets, BiFunction<SequenceOffsetType, SequenceOffsetType, Boolean> moreToReadFn) {
        this.lock.lock();
        try {
            this.assignments.clear();
            currOffsets.forEach((key, value) -> {
                SequenceOffsetType endOffset = this.endOffsets.get(key);
                if (((Boolean)moreToReadFn.apply(value, endOffset)).booleanValue()) {
                    this.assignments.add(key);
                }
            });
        }
        finally {
            this.lock.unlock();
        }
    }

    boolean isOpen() {
        return !this.assignments.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean canHandle(SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner, OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record) {
        this.lock.lock();
        try {
            boolean ret;
            OrderedSequenceNumber<SequenceOffsetType> partitionEndOffset = runner.createSequenceNumber(this.endOffsets.get(record.getPartitionId()));
            OrderedSequenceNumber<SequenceOffsetType> partitionStartOffset = runner.createSequenceNumber(this.startOffsets.get(record.getPartitionId()));
            OrderedSequenceNumber<SequenceOffsetType> recordOffset = runner.createSequenceNumber(record.getSequenceNumber());
            if (!this.isOpen() || recordOffset == null || partitionEndOffset == null || partitionStartOffset == null) {
                boolean bl = false;
                return bl;
            }
            if (!runner.isEndOffsetExclusive()) {
                ret = recordOffset.compareTo(partitionStartOffset) >= (this.getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0);
            } else {
                boolean bl = ret = recordOffset.compareTo(partitionStartOffset) >= 0;
            }
            ret = runner.isEndOffsetExclusive() ? (ret &= recordOffset.compareTo(partitionEndOffset) < 0) : (ret &= recordOffset.compareTo(partitionEndOffset) <= 0);
            boolean bl = ret;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    public String toString() {
        this.lock.lock();
        try {
            String string = "SequenceMetadata{sequenceId=" + this.sequenceId + ", sequenceName='" + this.sequenceName + '\'' + ", assignments=" + this.assignments + ", startOffsets=" + this.startOffsets + ", exclusiveStartPartitions=" + this.exclusiveStartPartitions + ", endOffsets=" + this.endOffsets + ", sentinel=" + this.sentinel + ", checkpointed=" + this.checkpointed + '}';
            return string;
        }
        finally {
            this.lock.unlock();
        }
    }

    Supplier<Committer> getCommitterSupplier(final SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner, final String stream, final Map<PartitionIdType, SequenceOffsetType> lastPersistedOffsets) {
        return () -> new Committer(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public Object getMetadata() {
                SequenceMetadata.this.lock.lock();
                try {
                    Preconditions.checkState((boolean)SequenceMetadata.this.assignments.isEmpty(), (String)"This committer can be used only once all the records till sequences [%s] have been consumed, also make sure to call updateAssignments before using this committer", (Object[])new Object[]{SequenceMetadata.this.endOffsets});
                    for (Map.Entry partitionOffset : SequenceMetadata.this.endOffsets.entrySet()) {
                        Object newOffsets = partitionOffset.getValue();
                        if (lastPersistedOffsets.containsKey(partitionOffset.getKey()) && runner.createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey())).compareTo(runner.createSequenceNumber(newOffsets)) > 0) {
                            newOffsets = lastPersistedOffsets.get(partitionOffset.getKey());
                        }
                        lastPersistedOffsets.put(partitionOffset.getKey(), newOffsets);
                    }
                    ImmutableMap immutableMap = ImmutableMap.of((Object)"nextPartitions", new SeekableStreamStartSequenceNumbers(stream, lastPersistedOffsets, SequenceMetadata.this.exclusiveStartPartitions), (Object)"publishPartitions", new SeekableStreamEndSequenceNumbers(stream, SequenceMetadata.this.endOffsets));
                    return immutableMap;
                }
                finally {
                    SequenceMetadata.this.lock.unlock();
                }
            }

            public void run() {
            }
        };
    }

    TransactionalSegmentPublisher createPublisher(SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner, TaskToolbox toolbox, boolean useTransaction) {
        return new SequenceMetadataTransactionalSegmentPublisher(runner, toolbox, useTransaction);
    }

    private boolean isMetadataUnchanged(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> startSequenceNumbers, SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> endSequenceNumbers) {
        Map<PartitionIdType, SequenceOffsetType> startMap = startSequenceNumbers.getPartitionSequenceNumberMap();
        Map<PartitionIdType, SequenceOffsetType> endMap = endSequenceNumbers.getPartitionSequenceNumberMap();
        return startMap.equals(endMap);
    }

    private class SequenceMetadataTransactionalSegmentPublisher
    implements TransactionalSegmentPublisher {
        private final SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner;
        private final TaskToolbox toolbox;
        private final boolean useTransaction;

        public SequenceMetadataTransactionalSegmentPublisher(SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner, TaskToolbox toolbox, boolean useTransaction) {
            this.runner = runner;
            this.toolbox = toolbox;
            this.useTransaction = useTransaction;
        }

        public SegmentPublishResult publishAnnotatedSegments(@Nullable Set<DataSegment> mustBeNullOrEmptySegments, Set<DataSegment> segmentsToPush, @Nullable Object commitMetadata) throws IOException {
            SegmentTransactionalInsertAction action;
            if (mustBeNullOrEmptySegments != null && !mustBeNullOrEmptySegments.isEmpty()) {
                throw new ISE("WTH? stream ingestion tasks are overwriting segments[%s]", new Object[]{mustBeNullOrEmptySegments});
            }
            Map commitMetaMap = (Map)Preconditions.checkNotNull((Object)commitMetadata, (Object)"commitMetadata");
            SeekableStreamEndSequenceNumbers finalPartitions = this.runner.deserializePartitionsFromMetadata(this.toolbox.getJsonMapper(), commitMetaMap.get("publishPartitions"));
            if (!SequenceMetadata.this.getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) {
                throw new ISE("WTF?! Driver for sequence [%s], attempted to publish invalid metadata[%s].", new Object[]{SequenceMetadata.this.toString(), commitMetadata});
            }
            if (segmentsToPush.isEmpty()) {
                SeekableStreamStartSequenceNumbers startPartitions = new SeekableStreamStartSequenceNumbers(finalPartitions.getStream(), SequenceMetadata.this.getStartOffsets(), SequenceMetadata.this.exclusiveStartPartitions);
                if (SequenceMetadata.this.isMetadataUnchanged(startPartitions, finalPartitions)) {
                    log.info("With empty segment set, start offsets [%s] and end offsets [%s] are the same, skipping metadata commit.", new Object[]{startPartitions, finalPartitions});
                    return SegmentPublishResult.ok(segmentsToPush);
                }
                log.info("With empty segment set, start offsets [%s] and end offsets [%s] changed, committing new metadata.", new Object[]{startPartitions, finalPartitions});
                action = SegmentTransactionalInsertAction.commitMetadataOnlyAction(this.runner.getAppenderator().getDataSource(), this.runner.createDataSourceMetadata(startPartitions), this.runner.createDataSourceMetadata(finalPartitions));
            } else {
                action = this.useTransaction ? SegmentTransactionalInsertAction.appendAction(segmentsToPush, this.runner.createDataSourceMetadata(new SeekableStreamStartSequenceNumbers(finalPartitions.getStream(), SequenceMetadata.this.getStartOffsets(), SequenceMetadata.this.exclusiveStartPartitions)), this.runner.createDataSourceMetadata(finalPartitions)) : SegmentTransactionalInsertAction.appendAction(segmentsToPush, null, null);
            }
            return this.toolbox.getTaskActionClient().submit(action);
        }

        public boolean supportsEmptyPublish() {
            return true;
        }
    }
}

