/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
import org.apache.paimon.table.source.snapshot.EmptyResultStartingScanner;
import org.apache.paimon.table.source.snapshot.FileCreationTimeStartingScanner;
import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.FullStartingScanner;
import org.apache.paimon.table.source.snapshot.IncrementalDeltaStartingScanner;
import org.apache.paimon.table.source.snapshot.IncrementalDiffStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractDataTableScan
implements DataTableScan {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractDataTableScan.class);
    private final CoreOptions options;
    protected final SnapshotReader snapshotReader;

    protected AbstractDataTableScan(CoreOptions options, SnapshotReader snapshotReader) {
        this.options = options;
        this.snapshotReader = snapshotReader;
    }

    @Override
    public AbstractDataTableScan withBucket(int bucket) {
        this.snapshotReader.withBucket(bucket);
        return this;
    }

    @Override
    public AbstractDataTableScan withBucketFilter(Filter<Integer> bucketFilter) {
        this.snapshotReader.withBucketFilter(bucketFilter);
        return this;
    }

    @Override
    public AbstractDataTableScan withPartitionFilter(Map<String, String> partitionSpec) {
        this.snapshotReader.withPartitionFilter(partitionSpec);
        return this;
    }

    @Override
    public AbstractDataTableScan withPartitionFilter(List<BinaryRow> partitions) {
        this.snapshotReader.withPartitionFilter(partitions);
        return this;
    }

    @Override
    public AbstractDataTableScan withPartitionsFilter(List<Map<String, String>> partitions) {
        this.snapshotReader.withPartitionsFilter(partitions);
        return this;
    }

    @Override
    public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
        this.snapshotReader.withLevelFilter(levelFilter);
        return this;
    }

    @Override
    public AbstractDataTableScan withMetricsRegistry(MetricRegistry metricsRegistry) {
        this.snapshotReader.withMetricRegistry(metricsRegistry);
        return this;
    }

    @Override
    public AbstractDataTableScan dropStats() {
        this.snapshotReader.dropStats();
        return this;
    }

    public CoreOptions options() {
        return this.options;
    }

    protected StartingScanner createStartingScanner(boolean isStreaming) {
        ConsumerManager consumerManager;
        Optional<Consumer> consumer;
        SnapshotManager snapshotManager = this.snapshotReader.snapshotManager();
        ChangelogManager changelogManager = this.snapshotReader.changelogManager();
        CoreOptions.StreamScanMode type = (CoreOptions.StreamScanMode)this.options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
        switch (type) {
            case COMPACT_BUCKET_TABLE: {
                Preconditions.checkArgument((boolean)isStreaming, (Object)"Set 'streaming-compact' in batch mode. This is unexpected.");
                return new ContinuousCompactorStartingScanner(snapshotManager);
            }
            case FILE_MONITOR: {
                return new FullStartingScanner(snapshotManager);
            }
        }
        String consumerId = this.options.consumerId();
        if (isStreaming && consumerId != null && !this.options.consumerIgnoreProgress() && (consumer = (consumerManager = this.snapshotReader.consumerManager()).consumer(consumerId)).isPresent()) {
            return new ContinuousFromSnapshotStartingScanner(snapshotManager, changelogManager, consumer.get().nextSnapshot(), this.options.changelogLifecycleDecoupled());
        }
        CoreOptions.StartupMode startupMode = this.options.startupMode();
        switch (startupMode) {
            case LATEST_FULL: {
                return new FullStartingScanner(snapshotManager);
            }
            case LATEST: {
                return isStreaming ? new ContinuousLatestStartingScanner(snapshotManager) : new FullStartingScanner(snapshotManager);
            }
            case COMPACTED_FULL: {
                if (this.options.changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION || this.options.toConfiguration().contains(CoreOptions.FULL_COMPACTION_DELTA_COMMITS)) {
                    int deltaCommits = this.options.toConfiguration().getOptional(CoreOptions.FULL_COMPACTION_DELTA_COMMITS).orElse(1);
                    return new FullCompactedStartingScanner(snapshotManager, deltaCommits);
                }
                return new CompactedStartingScanner(snapshotManager);
            }
            case FROM_TIMESTAMP: {
                Long startupMillis = this.options.scanTimestampMills();
                return isStreaming ? new ContinuousFromTimestampStartingScanner(snapshotManager, changelogManager, startupMillis, this.options.changelogLifecycleDecoupled()) : new StaticFromTimestampStartingScanner(snapshotManager, startupMillis);
            }
            case FROM_FILE_CREATION_TIME: {
                Long fileCreationTimeMills = this.options.scanFileCreationTimeMills();
                return new FileCreationTimeStartingScanner(snapshotManager, fileCreationTimeMills);
            }
            case FROM_SNAPSHOT: {
                if (this.options.scanSnapshotId() != null) {
                    return isStreaming ? new ContinuousFromSnapshotStartingScanner(snapshotManager, changelogManager, this.options.scanSnapshotId(), this.options.changelogLifecycleDecoupled()) : new StaticFromSnapshotStartingScanner(snapshotManager, this.options.scanSnapshotId());
                }
                if (this.options.scanWatermark() != null) {
                    Preconditions.checkArgument((!isStreaming ? 1 : 0) != 0, (Object)"Cannot scan from watermark in streaming mode.");
                    return new StaticFromWatermarkStartingScanner(snapshotManager, this.options().scanWatermark());
                }
                if (this.options.scanTagName() != null) {
                    Preconditions.checkArgument((!isStreaming ? 1 : 0) != 0, (Object)"Cannot scan from tag in streaming mode.");
                    return new StaticFromTagStartingScanner(snapshotManager, this.options().scanTagName());
                }
                throw new UnsupportedOperationException("Unknown snapshot read mode");
            }
            case FROM_SNAPSHOT_FULL: {
                Long scanSnapshotId = this.options.scanSnapshotId();
                Preconditions.checkNotNull((Object)scanSnapshotId, (String)"scan.snapshot-id must be set when startupMode is FROM_SNAPSHOT_FULL.");
                return isStreaming ? new ContinuousFromSnapshotFullStartingScanner(snapshotManager, scanSnapshotId) : new StaticFromSnapshotStartingScanner(snapshotManager, scanSnapshotId);
            }
            case INCREMENTAL: {
                Preconditions.checkArgument((!isStreaming ? 1 : 0) != 0, (Object)"Cannot read incremental in streaming mode.");
                return this.createIncrementalStartingScanner(snapshotManager);
            }
        }
        throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
    }

    private StartingScanner createIncrementalStartingScanner(SnapshotManager snapshotManager) {
        Options conf = this.options.toConfiguration();
        if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) {
            long endId;
            long startId;
            Pair incrementalBetween = this.options.incrementalBetween();
            TagManager tagManager = new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath(), snapshotManager.branch());
            Optional<Tag> startTag = tagManager.get((String)incrementalBetween.getLeft());
            Optional<Tag> endTag = tagManager.get((String)incrementalBetween.getRight());
            if (startTag.isPresent() && endTag.isPresent()) {
                return IncrementalDiffStartingScanner.betweenTags(startTag.get(), endTag.get(), snapshotManager, (Pair<String, String>)incrementalBetween);
            }
            try {
                startId = Long.parseLong((String)incrementalBetween.getLeft());
                endId = Long.parseLong((String)incrementalBetween.getRight());
            }
            catch (NumberFormatException e) {
                throw new IllegalArgumentException(String.format("Didn't find two tags for start '%s' and end '%s', and they are not two snapshot Ids. Please set two tags or two snapshot Ids.", incrementalBetween.getLeft(), incrementalBetween.getRight()));
            }
            Preconditions.checkArgument((endId >= startId ? 1 : 0) != 0, (String)"Ending snapshotId should >= starting snapshotId %s.", (Object[])new Object[]{endId, startId});
            if (snapshotManager.earliestSnapshot() == null) {
                LOG.warn("There is currently no snapshot. Waiting for snapshot generation.");
                return new EmptyResultStartingScanner(snapshotManager);
            }
            if (startId == endId) {
                return new EmptyResultStartingScanner(snapshotManager);
            }
            CoreOptions.IncrementalBetweenScanMode scanMode = this.options.incrementalBetweenScanMode();
            return scanMode == CoreOptions.IncrementalBetweenScanMode.DIFF ? IncrementalDiffStartingScanner.betweenSnapshotIds(startId, endId, snapshotManager) : IncrementalDeltaStartingScanner.betweenSnapshotIds(startId, endId, snapshotManager, this.toSnapshotScanMode(scanMode));
        }
        if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) {
            Pair incrementalBetween = this.options.incrementalBetweenTimestamp();
            Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
            Snapshot latestSnapshot = snapshotManager.latestSnapshot();
            if (earliestSnapshot == null || latestSnapshot == null) {
                return new EmptyResultStartingScanner(snapshotManager);
            }
            long startTimestamp = (Long)incrementalBetween.getLeft();
            long endTimestamp = (Long)incrementalBetween.getRight();
            Preconditions.checkArgument((endTimestamp >= startTimestamp ? 1 : 0) != 0, (String)"Ending timestamp %s should be >= starting timestamp %s.", (Object[])new Object[]{endTimestamp, startTimestamp});
            if (startTimestamp == endTimestamp || startTimestamp > latestSnapshot.timeMillis() || endTimestamp < earliestSnapshot.timeMillis()) {
                return new EmptyResultStartingScanner(snapshotManager);
            }
            CoreOptions.IncrementalBetweenScanMode scanMode = this.options.incrementalBetweenScanMode();
            return scanMode == CoreOptions.IncrementalBetweenScanMode.DIFF ? IncrementalDiffStartingScanner.betweenTimestamps(startTimestamp, endTimestamp, snapshotManager) : IncrementalDeltaStartingScanner.betweenTimestamps(startTimestamp, endTimestamp, snapshotManager, this.toSnapshotScanMode(scanMode));
        }
        if (conf.contains(CoreOptions.INCREMENTAL_TO_AUTO_TAG)) {
            String endTag = this.options.incrementalToAutoTag();
            return IncrementalDiffStartingScanner.toEndAutoTag(snapshotManager, endTag, this.options);
        }
        throw new UnsupportedOperationException("Unknown incremental read mode.");
    }

    private ScanMode toSnapshotScanMode(CoreOptions.IncrementalBetweenScanMode scanMode) {
        switch (scanMode) {
            case AUTO: {
                return this.options.changelogProducer() == CoreOptions.ChangelogProducer.NONE ? ScanMode.DELTA : ScanMode.CHANGELOG;
            }
            case DELTA: {
                return ScanMode.DELTA;
            }
            case CHANGELOG: {
                return ScanMode.CHANGELOG;
            }
        }
        throw new UnsupportedOperationException("Unsupported incremental scan mode " + scanMode.name());
    }
}

