package org.apache.kafka.streams.kstream.internals;

import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.class */
public abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther> implements ProcessorSupplier<K, VThis, K, VOut> {
    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);
    private final String otherWindowName;
    private final long joinBeforeMs;
    private final long joinAfterMs;
    private final long joinGraceMs;
    private final boolean enableSpuriousResultFix;
    private final long windowsBeforeMs;
    private final long windowsAfterMs;
    private final boolean outer;
    private final Optional<String> outerJoinWindowName;
    private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner;
    private final KStreamImplJoin.TimeTrackerSupplier sharedTimeTrackerSupplier;

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin$KStreamKStreamJoinProcessor.class */
    protected abstract class KStreamKStreamJoinProcessor extends ContextualProcessor<K, VThis, K, VOut> {
        private WindowStore<K, VOther> otherWindowStore;
        private Sensor droppedRecordsSensor;
        private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>>> outerJoinStore = Optional.empty();
        private InternalProcessorContext<K, VOut> internalProcessorContext;
        private KStreamImplJoin.TimeTracker sharedTimeTracker;

        /* JADX INFO: Access modifiers changed from: protected */
        public KStreamKStreamJoinProcessor() {
        }

        @Override // org.apache.kafka.streams.processor.api.ContextualProcessor, org.apache.kafka.streams.processor.api.Processor
        public void init(ProcessorContext<K, VOut> processorContext) {
            super.init(processorContext);
            this.internalProcessorContext = (InternalProcessorContext) processorContext;
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), (StreamsMetricsImpl) processorContext.metrics());
            this.otherWindowStore = (WindowStore) processorContext.getStateStore(KStreamKStreamJoin.this.otherWindowName);
            this.sharedTimeTracker = KStreamKStreamJoin.this.sharedTimeTrackerSupplier.get(processorContext.taskId());
            if (KStreamKStreamJoin.this.enableSpuriousResultFix) {
                Optional optional = KStreamKStreamJoin.this.outerJoinWindowName;
                Objects.requireNonNull(processorContext);
                this.outerJoinStore = optional.map(processorContext::getStateStore);
                this.sharedTimeTracker.setEmitInterval(StreamsConfig.InternalConfig.getLong(processorContext.appConfigs(), StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 1000L));
            }
        }

        @Override // org.apache.kafka.streams.processor.api.Processor
        public void process(Record<K, VThis> record) {
            long timestamp = record.timestamp();
            this.sharedTimeTracker.advanceStreamTime(timestamp);
            if (KStreamKStreamJoin.this.outer && record.key() == null && record.value() != null) {
                context().forward(record.withValue(KStreamKStreamJoin.this.joiner.apply(record.key(), record.value(), null)));
                return;
            }
            if (StreamStreamJoinUtil.skipRecord(record, KStreamKStreamJoin.LOG, this.droppedRecordsSensor, context())) {
                return;
            }
            if (timestamp == this.sharedTimeTracker.streamTime) {
                this.outerJoinStore.ifPresent(keyValueStore -> {
                    emitNonJoinedOuterRecords(keyValueStore, record);
                });
            }
            long max = Math.max(0L, timestamp - KStreamKStreamJoin.this.joinBeforeMs);
            long max2 = Math.max(0L, timestamp + KStreamKStreamJoin.this.joinAfterMs);
            WindowStoreIterator<VOther> fetch = this.otherWindowStore.fetch((WindowStore<K, VOther>) record.key(), max, max2);
            try {
                boolean z = KStreamKStreamJoin.this.outer && !fetch.hasNext();
                fetch.forEachRemaining(keyValue -> {
                    emitInnerJoin(record, keyValue, timestamp);
                });
                if (z) {
                    if (!this.outerJoinStore.isPresent() || max2 < this.sharedTimeTracker.streamTime) {
                        context().forward(record.withValue(KStreamKStreamJoin.this.joiner.apply(record.key(), record.value(), null)));
                    } else {
                        this.sharedTimeTracker.updatedMinTime(timestamp);
                        putInOuterJoinStore(record);
                    }
                }
                if (fetch != null) {
                    fetch.close();
                }
            } catch (Throwable th) {
                if (fetch != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        protected abstract TimestampedKeyAndJoinSide<K> makeThisKey(K k, long j);

        protected abstract LeftOrRightValue<VLeft, VRight> makeThisValue(VThis vthis);

        protected abstract TimestampedKeyAndJoinSide<K> makeOtherKey(K k, long j);

        protected abstract VThis getThisValue(LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);

        protected abstract VOther getOtherValue(LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);

        /* JADX WARN: Multi-variable type inference failed */
        private void emitNonJoinedOuterRecords(KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> keyValueStore, Record<K, VThis> record) {
            if (this.sharedTimeTracker.minTime + KStreamKStreamJoin.this.joinAfterMs + KStreamKStreamJoin.this.joinGraceMs < this.sharedTimeTracker.streamTime && this.internalProcessorContext.currentSystemTimeMs() >= this.sharedTimeTracker.nextTimeToEmit) {
                this.sharedTimeTracker.nextTimeToEmit = this.internalProcessorContext.currentSystemTimeMs();
                this.sharedTimeTracker.advanceNextTimeToEmit();
                this.sharedTimeTracker.minTime = Long.MAX_VALUE;
                KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> all = keyValueStore.all();
                TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = null;
                boolean z = false;
                boolean z2 = false;
                while (all.hasNext()) {
                    try {
                        KeyValue next = all.next();
                        TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide2 = (TimestampedKeyAndJoinSide) next.key;
                        this.sharedTimeTracker.minTime = timestampedKeyAndJoinSide2.getTimestamp();
                        if (z && z2) {
                            break;
                        }
                        if (!isOuterJoinWindowOpen(timestampedKeyAndJoinSide2)) {
                            forwardNonJoinedOuterRecords(record, timestampedKeyAndJoinSide2, (LeftOrRightValue) next.value);
                            if (timestampedKeyAndJoinSide != null && !timestampedKeyAndJoinSide.equals(timestampedKeyAndJoinSide2)) {
                                keyValueStore.put(timestampedKeyAndJoinSide, null);
                            }
                            timestampedKeyAndJoinSide = timestampedKeyAndJoinSide2;
                        } else if (timestampedKeyAndJoinSide2.isLeftSide()) {
                            z = true;
                        } else {
                            z2 = true;
                        }
                    } catch (Throwable th) {
                        if (all != null) {
                            try {
                                all.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                if (timestampedKeyAndJoinSide != null) {
                    keyValueStore.put(timestampedKeyAndJoinSide, null);
                }
                if (all != null) {
                    all.close();
                }
            }
        }

        private void forwardNonJoinedOuterRecords(Record<K, VThis> record, TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide, LeftOrRightValue<VLeft, VRight> leftOrRightValue) {
            K key = timestampedKeyAndJoinSide.getKey();
            context().forward(record.withKey(key).withValue(KStreamKStreamJoin.this.joiner.apply(key, getThisValue(leftOrRightValue), getOtherValue(leftOrRightValue))).withTimestamp(timestampedKeyAndJoinSide.getTimestamp()));
        }

        private boolean isOuterJoinWindowOpen(TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
            return (this.sharedTimeTracker.minTime + getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide)) + KStreamKStreamJoin.this.joinGraceMs >= this.sharedTimeTracker.streamTime;
        }

        private long getOuterJoinLookBackTimeMs(TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
            return timestampedKeyAndJoinSide.isLeftSide() ? KStreamKStreamJoin.this.windowsAfterMs : KStreamKStreamJoin.this.windowsBeforeMs;
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void emitInnerJoin(Record<K, VThis> record, KeyValue<Long, VOther> keyValue, long j) {
            this.outerJoinStore.ifPresent(keyValueStore -> {
                keyValueStore.putIfAbsent(makeOtherKey(record.key(), ((Long) keyValue.key).longValue()), null);
            });
            context().forward(record.withValue(KStreamKStreamJoin.this.joiner.apply(record.key(), record.value(), keyValue.value)).withTimestamp(Math.max(j, keyValue.key.longValue())));
        }

        private void putInOuterJoinStore(Record<K, VThis> record) {
            this.outerJoinStore.ifPresent(keyValueStore -> {
                keyValueStore.put(makeThisKey(record.key(), record.timestamp()), makeThisValue(record.value()));
            });
        }

        @Override // org.apache.kafka.streams.processor.api.Processor
        public void close() {
            KStreamKStreamJoin.this.sharedTimeTrackerSupplier.remove(context().taskId());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KStreamKStreamJoin(String str, JoinWindowsInternal joinWindowsInternal, ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> valueJoinerWithKey, boolean z, Optional<String> optional, long j, long j2, KStreamImplJoin.TimeTrackerSupplier timeTrackerSupplier) {
        this.otherWindowName = str;
        this.joinBeforeMs = j;
        this.joinAfterMs = j2;
        this.windowsAfterMs = joinWindowsInternal.afterMs;
        this.windowsBeforeMs = joinWindowsInternal.beforeMs;
        this.joinGraceMs = joinWindowsInternal.gracePeriodMs();
        this.enableSpuriousResultFix = joinWindowsInternal.spuriousResultFixEnabled();
        this.joiner = valueJoinerWithKey;
        this.outer = z;
        this.outerJoinWindowName = optional;
        this.sharedTimeTrackerSupplier = timeTrackerSupplier;
    }
}
