package org.apache.flink.table.planner.plan.abilities.source;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import scala.Option;

@JsonTypeName("WatermarkPushDown")
/* loaded from: input_file:org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.class */
public class WatermarkPushDownSpec extends SourceAbilitySpecBase {
    public static final String FIELD_NAME_WATERMARK_EXPR = "watermarkExpr";
    public static final String FIELD_NAME_IDLE_TIMEOUT_MILLIS = "idleTimeoutMillis";

    @JsonProperty("watermarkExpr")
    private final RexNode watermarkExpr;

    @JsonProperty(FIELD_NAME_IDLE_TIMEOUT_MILLIS)
    private final long idleTimeoutMillis;

    /* loaded from: input_file:org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec$DefaultWatermarkGeneratorSupplier.class */
    public static class DefaultWatermarkGeneratorSupplier implements WatermarkGeneratorSupplier<RowData> {
        private static final long serialVersionUID = 1;
        private final Configuration configuration;
        private final GeneratedWatermarkGenerator generatedWatermarkGenerator;

        /* loaded from: input_file:org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec$DefaultWatermarkGeneratorSupplier$DefaultWatermarkGenerator.class */
        public static class DefaultWatermarkGenerator implements WatermarkGenerator<RowData> {
            private static final long serialVersionUID = 1;
            private final org.apache.flink.table.runtime.generated.WatermarkGenerator innerWatermarkGenerator;
            private Long currentWatermark = Long.MIN_VALUE;

            public DefaultWatermarkGenerator(org.apache.flink.table.runtime.generated.WatermarkGenerator watermarkGenerator) {
                this.innerWatermarkGenerator = watermarkGenerator;
            }

            public void onEvent(RowData rowData, long j, WatermarkOutput watermarkOutput) {
                try {
                    Long currentWatermark = this.innerWatermarkGenerator.currentWatermark(rowData);
                    if (currentWatermark != null) {
                        this.currentWatermark = currentWatermark;
                    }
                } catch (Exception e) {
                    throw new RuntimeException(String.format("Generated WatermarkGenerator fails to generate for row: %s.", rowData), e);
                }
            }

            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                watermarkOutput.emitWatermark(new Watermark(this.currentWatermark.longValue()));
            }
        }

        public DefaultWatermarkGeneratorSupplier(Configuration configuration, GeneratedWatermarkGenerator generatedWatermarkGenerator) {
            this.configuration = configuration;
            this.generatedWatermarkGenerator = generatedWatermarkGenerator;
        }

        public WatermarkGenerator<RowData> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            ArrayList arrayList = new ArrayList(Arrays.asList(this.generatedWatermarkGenerator.getReferences()));
            arrayList.add(context);
            org.apache.flink.table.runtime.generated.WatermarkGenerator newInstance = new GeneratedWatermarkGenerator(this.generatedWatermarkGenerator.getClassName(), this.generatedWatermarkGenerator.getCode(), arrayList.toArray(), this.configuration).newInstance(Thread.currentThread().getContextClassLoader());
            try {
                newInstance.open(this.configuration);
                return new DefaultWatermarkGenerator(newInstance);
            } catch (Exception e) {
                throw new RuntimeException("Fail to instantiate generated watermark generator.", e);
            }
        }
    }

    @JsonCreator
    public WatermarkPushDownSpec(@JsonProperty("watermarkExpr") RexNode rexNode, @JsonProperty("idleTimeoutMillis") long j, @JsonProperty("producedType") RowType rowType) {
        super(rowType);
        this.watermarkExpr = (RexNode) Preconditions.checkNotNull(rexNode);
        this.idleTimeoutMillis = j;
    }

    @Override // org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec
    public void apply(DynamicTableSource dynamicTableSource, SourceAbilityContext sourceAbilityContext) {
        if (!(dynamicTableSource instanceof SupportsWatermarkPushDown)) {
            throw new TableException(String.format("%s does not support SupportsWatermarkPushDown.", dynamicTableSource.getClass().getName()));
        }
        WatermarkStrategy<RowData> forGenerator = WatermarkStrategy.forGenerator(new DefaultWatermarkGeneratorSupplier(sourceAbilityContext.getTableConfig().getConfiguration(), WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(sourceAbilityContext.getTableConfig(), sourceAbilityContext.getSourceRowType(), this.watermarkExpr, Option.apply("context"))));
        if (this.idleTimeoutMillis > 0) {
            forGenerator = forGenerator.withIdleness(Duration.ofMillis(this.idleTimeoutMillis));
        }
        ((SupportsWatermarkPushDown) dynamicTableSource).applyWatermark(forGenerator);
    }
}
