package org.apache.flink.streaming.tests;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

/* loaded from: input_file:org/apache/flink/streaming/tests/SequenceGeneratorSource.class */
public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> implements CheckpointedFunction {
    private static final long serialVersionUID = -3986989644799442178L;
    private final int payloadLength;
    private final int totalKeySpaceSize;
    private final long eventTimeClockProgressPerEvent;
    private final long maxOutOfOrder;
    private final long sleepTime;
    private final long sleepAfterElements;
    private long monotonousEventTime;
    private transient List<KeyRangeStates> keyRanges;
    private transient ListState<KeyRangeStates> snapshotKeyRanges;
    private transient ListState<Long> lastEventTimes;
    private volatile boolean running;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/tests/SequenceGeneratorSource$KeyRangeStates.class */
    public static class KeyRangeStates {
        final int startKey;
        final int endKey;
        final long[] statesPerKey;

        KeyRangeStates(int i, int i2) {
            this(i, i2, new long[i2 - i]);
        }

        KeyRangeStates(int i, int i2, long[] jArr) {
            Preconditions.checkArgument(jArr.length == i2 - i);
            this.startKey = i;
            this.endKey = i2;
            this.statesPerKey = jArr;
        }

        long incrementAndGet(int i) {
            long[] jArr = this.statesPerKey;
            int i2 = i - this.startKey;
            long j = jArr[i2] + 1;
            jArr[i2] = j;
            return j;
        }

        int getRandomKey(Random random) {
            return random.nextInt(this.endKey - this.startKey) + this.startKey;
        }

        public String toString() {
            return "KeyRangeStates{start=" + this.startKey + ", end=" + this.endKey + ", statesPerKey=" + Arrays.toString(this.statesPerKey) + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SequenceGeneratorSource(int i, int i2, long j, long j2, long j3, long j4) {
        this.totalKeySpaceSize = i;
        this.maxOutOfOrder = j;
        this.payloadLength = i2;
        this.eventTimeClockProgressPerEvent = j2;
        this.sleepTime = j3;
        this.sleepAfterElements = j3 > 0 ? j4 : 0L;
        this.running = true;
    }

    public void run(SourceFunction.SourceContext<Event> sourceContext) throws Exception {
        if (this.keyRanges.size() > 0) {
            runActive(sourceContext);
        } else {
            runIdle(sourceContext);
        }
    }

    private void runActive(SourceFunction.SourceContext<Event> sourceContext) throws Exception {
        Random random = new Random();
        long j = this.sleepAfterElements;
        while (this.running) {
            KeyRangeStates keyRangeStates = this.keyRanges.get(random.nextInt(this.keyRanges.size()));
            int randomKey = keyRangeStates.getRandomKey(random);
            long max = Math.max(0L, generateEventTimeWithOutOfOrderness(random, this.monotonousEventTime));
            this.monotonousEventTime += this.eventTimeClockProgressPerEvent;
            synchronized (sourceContext.getCheckpointLock()) {
                sourceContext.collect(new Event(randomKey, max, keyRangeStates.incrementAndGet(randomKey), StringUtils.getRandomString(random, this.payloadLength, this.payloadLength, 'A', 'z')));
            }
            if (this.sleepTime > 0) {
                if (j == 1) {
                    j = this.sleepAfterElements;
                    Thread.sleep(this.sleepTime);
                } else if (j > 1) {
                    j--;
                }
            }
        }
    }

    private void runIdle(SourceFunction.SourceContext<Event> sourceContext) {
        sourceContext.markAsTemporarilyIdle();
        Object obj = new Object();
        while (this.running) {
            try {
                synchronized (obj) {
                    obj.wait();
                }
            } catch (InterruptedException e) {
                if (!this.running) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private long generateEventTimeWithOutOfOrderness(Random random, long j) {
        return this.maxOutOfOrder > 0 ? (j - this.maxOutOfOrder) + ((random.nextLong() & Long.MAX_VALUE) % (2 * this.maxOutOfOrder)) : j;
    }

    public void cancel() {
        this.running = false;
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.snapshotKeyRanges.update(this.keyRanges);
        this.lastEventTimes.clear();
        this.lastEventTimes.add(Long.valueOf(this.monotonousEventTime));
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
        int maxNumberOfParallelSubtasks = runtimeContext.getMaxNumberOfParallelSubtasks();
        this.lastEventTimes = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("watermarks", Long.class));
        this.snapshotKeyRanges = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("keyRanges", KeyRangeStates.class));
        this.keyRanges = new ArrayList();
        if (functionInitializationContext.isRestored()) {
            Iterator it = ((Iterable) this.snapshotKeyRanges.get()).iterator();
            while (it.hasNext()) {
                this.keyRanges.add((KeyRangeStates) it.next());
            }
            Iterator it2 = ((Iterable) this.lastEventTimes.get()).iterator();
            while (it2.hasNext()) {
                this.monotonousEventTime = Math.max(this.monotonousEventTime, ((Long) it2.next()).longValue());
            }
            return;
        }
        int i = (indexOfThisSubtask * maxNumberOfParallelSubtasks) / numberOfParallelSubtasks;
        int i2 = ((indexOfThisSubtask + 1) * maxNumberOfParallelSubtasks) / numberOfParallelSubtasks;
        for (int i3 = i; i3 < i2; i3++) {
            int i4 = (((i3 * this.totalKeySpaceSize) + maxNumberOfParallelSubtasks) - 1) / maxNumberOfParallelSubtasks;
            int i5 = 1 + ((((i3 + 1) * this.totalKeySpaceSize) - 1) / maxNumberOfParallelSubtasks);
            if (i5 - i4 > 0) {
                this.keyRanges.add(new KeyRangeStates(i4, i5));
            }
        }
        this.monotonousEventTime = 0L;
    }
}
