package org.apache.flink.streaming.runtime.io.benchmark;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.class */
public abstract class ReceiverThread extends CheckedThread {
    protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class);
    protected final int expectedRepetitionsOfExpectedRecord;
    protected int expectedRecordCounter;
    protected CompletableFuture<Long> expectedRecord = new CompletableFuture<>();
    protected CompletableFuture<?> recordsProcessed = new CompletableFuture<>();
    protected volatile boolean running;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReceiverThread(int i) {
        setName(getClass().getName());
        this.expectedRepetitionsOfExpectedRecord = i;
        this.running = true;
    }

    public synchronized CompletableFuture<?> setExpectedRecord(long j) {
        Preconditions.checkState(!this.expectedRecord.isDone());
        Preconditions.checkState(!this.recordsProcessed.isDone());
        this.expectedRecord.complete(Long.valueOf(j));
        this.expectedRecordCounter = 0;
        return this.recordsProcessed;
    }

    private synchronized CompletableFuture<Long> getExpectedRecord() {
        return this.expectedRecord;
    }

    private synchronized void finishProcessingExpectedRecords() {
        Preconditions.checkState(this.expectedRecord.isDone());
        Preconditions.checkState(!this.recordsProcessed.isDone());
        this.recordsProcessed.complete(null);
        this.expectedRecord = new CompletableFuture<>();
        this.recordsProcessed = new CompletableFuture<>();
    }

    public void go() throws Exception {
        while (this.running) {
            try {
                readRecords(getExpectedRecord().get().longValue());
                finishProcessingExpectedRecords();
            } catch (InterruptedException e) {
                if (this.running) {
                    throw e;
                }
                return;
            } catch (Exception e2) {
                e2.printStackTrace();
                return;
            }
        }
    }

    protected abstract void readRecords(long j) throws Exception;

    public void shutdown() {
        this.running = false;
        interrupt();
        this.expectedRecord.complete(0L);
    }
}
