/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util.asyncprocessing;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;

public class AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
extends MultiInputStreamOperatorTestHarness<OUT> {
    private final ExecutorService executor;

    public static <K, OUT, OP extends AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>> OP create(FunctionWithException<ExecutorService, OP, Exception> constructor) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture future = new CompletableFuture();
        executor.execute(() -> {
            try {
                future.complete((AsyncKeyedMultiInputStreamOperatorTestHarness)constructor.apply((Object)executor));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (OP)((AsyncKeyedMultiInputStreamOperatorTestHarness)future.get());
    }

    public static <K, OUT> AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT> create(StreamOperatorFactory<OUT> operatorFactory, TypeInformation<K> keyType, List<KeySelector<?, K>> keySelectors, int maxParallelism, int numSubtasks, int subtaskIndex) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture future = new CompletableFuture();
        executor.execute(() -> {
            try {
                future.complete(new AsyncKeyedMultiInputStreamOperatorTestHarness(executor, operatorFactory, keyType, keySelectors, maxParallelism, numSubtasks, subtaskIndex));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (AsyncKeyedMultiInputStreamOperatorTestHarness)future.get();
    }

    private AsyncKeyedMultiInputStreamOperatorTestHarness(ExecutorService executor, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<K> keyType, List<KeySelector<?, K>> keySelectors, int maxParallelism, int numSubtasks, int subtaskIndex) throws Exception {
        super(operatorFactory, maxParallelism, numSubtasks, subtaskIndex);
        this.config.setStateKeySerializer(keyType.createSerializer(this.executionConfig.getSerializerConfig()));
        this.config.serializeAllConfigs();
        for (int i = 0; i < keySelectors.size(); ++i) {
            this.setKeySelector(i, keySelectors.get(i));
        }
        this.executor = executor;
        this.getEnvironment().setExpectedExternalFailureCause(Throwable.class);
    }

    public void setKeySelector(int idx, KeySelector<?, K> keySelector) {
        ClosureCleaner.clean(keySelector, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)false);
        this.config.setStatePartitioner(idx, keySelector);
        this.config.serializeAllConfigs();
    }

    @Override
    public void processElement(int idx, StreamRecord<?> element) throws Exception {
        Input input = (Input)this.getCastedOperator().getInputs().get(idx);
        ThrowingConsumer inputProcessor = RecordProcessorUtils.getRecordProcessor((Input)input);
        this.executeAndGet(() -> inputProcessor.accept((Object)element));
    }

    @Override
    public void processWatermark(int idx, Watermark mark) throws Exception {
        Input input = (Input)this.getCastedOperator().getInputs().get(idx);
        this.executeAndGet(() -> input.processWatermark(mark));
    }

    @Override
    public void processWatermarkStatus(int idx, WatermarkStatus watermarkStatus) throws Exception {
        Input input = (Input)this.getCastedOperator().getInputs().get(idx);
        this.executeAndGet(() -> input.processWatermarkStatus(watermarkStatus));
    }

    @Override
    public void processRecordAttributes(int idx, RecordAttributes recordAttributes) throws Exception {
        Input input = (Input)this.getCastedOperator().getInputs().get(idx);
        this.executeAndGet(() -> input.processRecordAttributes(recordAttributes));
    }

    public void drainStateRequests() throws Exception {
        this.executeAndGet(() -> AsyncProcessingTestUtil.drain(this.operator));
    }

    @Override
    public void close() throws Exception {
        this.executeAndGet(() -> super.close());
        this.executor.shutdown();
    }

    private void executeAndGet(RunnableWithException runnable) throws Exception {
        try {
            AsyncProcessingTestUtil.execute(this.executor, () -> {
                this.checkEnvState();
                runnable.run();
            }).get();
            this.checkEnvState();
        }
        catch (Exception e) {
            AsyncProcessingTestUtil.execute(this.executor, () -> this.mockTask.cleanUp(e)).get();
            throw AsyncProcessingTestUtil.unwrapAsyncException(e);
        }
    }

    private void checkEnvState() {
        if (this.getEnvironment().getActualExternalFailureCause().isPresent()) {
            Assertions.fail((String)"There is an error on other threads", (Throwable)this.getEnvironment().getActualExternalFailureCause().get());
        }
    }
}

