/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.seekablestream;

import java.io.File;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.indexing.overlord.sampler.SamplerException;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.parsers.CloseableIterator;

public class RecordSupplierInputSource<PartitionIdType, SequenceOffsetType>
extends AbstractInputSource {
    private final String topic;
    private final RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier;
    private final boolean useEarliestOffset;

    RecordSupplierInputSource(String topic, RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier, boolean useEarliestOffset) {
        this.topic = topic;
        this.recordSupplier = recordSupplier;
        this.useEarliestOffset = useEarliestOffset;
        try {
            this.assignAndSeek(recordSupplier);
        }
        catch (InterruptedException e) {
            throw new SamplerException(e, "Exception while seeking to partitions", new Object[0]);
        }
    }

    private void assignAndSeek(RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier) throws InterruptedException {
        Set partitions = recordSupplier.getPartitionIds(this.topic).stream().map(partitionId -> StreamPartition.of(this.topic, partitionId)).collect(Collectors.toSet());
        recordSupplier.assign(partitions);
        if (this.useEarliestOffset) {
            recordSupplier.seekToEarliest(partitions);
        } else {
            recordSupplier.seekToLatest(partitions);
        }
    }

    public boolean isSplittable() {
        return false;
    }

    public boolean needsFormat() {
        return true;
    }

    protected InputSourceReader formattableReader(InputRowSchema inputRowSchema, InputFormat inputFormat, @Nullable File temporaryDirectory) {
        return new InputEntityIteratingReader(inputRowSchema, inputFormat, this.createEntityIterator(), temporaryDirectory);
    }

    CloseableIterator<InputEntity> createEntityIterator() {
        return new CloseableIterator<InputEntity>(){
            private Iterator<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType>> recordIterator;
            private Iterator<byte[]> bytesIterator;
            private volatile boolean closed;

            private void waitNextIteratorIfNecessary() {
                while (!(this.closed || this.bytesIterator != null && this.bytesIterator.hasNext())) {
                    while (!(this.closed || this.recordIterator != null && this.recordIterator.hasNext())) {
                        this.recordIterator = RecordSupplierInputSource.this.recordSupplier.poll(100L).iterator();
                    }
                    if (this.closed) continue;
                    this.bytesIterator = this.recordIterator.next().getData().iterator();
                }
            }

            public boolean hasNext() {
                this.waitNextIteratorIfNecessary();
                return this.bytesIterator != null && this.bytesIterator.hasNext();
            }

            public InputEntity next() {
                return new ByteEntity(this.bytesIterator.next());
            }

            public void close() {
                this.closed = true;
                RecordSupplierInputSource.this.recordSupplier.close();
            }
        };
    }
}

