package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.lang.reflect.InvocationTargetException;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.functions.python.PythonFunctionKind;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty;
import org.apache.flink.table.planner.plan.logical.LogicalWindow;
import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.PythonUtil;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy;
import org.apache.flink.table.planner.typeutils.DataViewUtils;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.assigners.CountSlidingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.CountTumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.SessionWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.triggers.ElementTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.EventTimeTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.ProcessingTimeTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonIgnoreProperties(ignoreUnknown = true)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.class */
public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamExecPythonGroupWindowAggregate.class);
    private static final String ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME = "org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.StreamArrowPythonGroupWindowAggregateFunctionOperator";
    private static final String GENERAL_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME = "org.apache.flink.table.runtime.operators.python.aggregate.PythonStreamGroupWindowAggregateOperator";
    public static final String FIELD_NAME_WINDOW = "window";
    public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = "namedWindowProperties";

    @JsonProperty(StreamExecAggregateBase.FIELD_NAME_GROUPING)
    private final int[] grouping;

    @JsonProperty("aggCalls")
    private final AggregateCall[] aggCalls;

    @JsonSerialize(using = LogicalWindowJsonSerializer.class)
    @JsonDeserialize(using = LogicalWindowJsonDeserializer.class)
    @JsonProperty("window")
    private final LogicalWindow window;

    @JsonProperty("namedWindowProperties")
    private final PlannerNamedWindowProperty[] namedWindowProperties;

    @JsonProperty(StreamExecAggregateBase.FIELD_NAME_NEED_RETRACTION)
    private final boolean needRetraction;

    @JsonProperty("generateUpdateBefore")
    private final boolean generateUpdateBefore;

    public StreamExecPythonGroupWindowAggregate(int[] iArr, AggregateCall[] aggregateCallArr, LogicalWindow logicalWindow, PlannerNamedWindowProperty[] plannerNamedWindowPropertyArr, boolean z, boolean z2, InputProperty inputProperty, RowType rowType, String str) {
        this(iArr, aggregateCallArr, logicalWindow, plannerNamedWindowPropertyArr, z, z2, getNewNodeId(), Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecPythonGroupWindowAggregate(@JsonProperty("grouping") int[] iArr, @JsonProperty("aggCalls") AggregateCall[] aggregateCallArr, @JsonProperty("window") LogicalWindow logicalWindow, @JsonProperty("namedWindowProperties") PlannerNamedWindowProperty[] plannerNamedWindowPropertyArr, @JsonProperty("generateUpdateBefore") boolean z, @JsonProperty("needRetraction") boolean z2, @JsonProperty("id") int i, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, list, rowType, str);
        Preconditions.checkArgument(list.size() == 1);
        this.grouping = (int[]) Preconditions.checkNotNull(iArr);
        this.aggCalls = (AggregateCall[]) Preconditions.checkNotNull(aggregateCallArr);
        this.window = (LogicalWindow) Preconditions.checkNotNull(logicalWindow);
        this.namedWindowProperties = (PlannerNamedWindowProperty[]) Preconditions.checkNotNull(plannerNamedWindowPropertyArr);
        this.generateUpdateBefore = z;
        this.needRetraction = z2;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase) {
        int i;
        OneInputTransformation<RowData, RowData> createPandasPythonStreamWindowGroupOneInputTransformation;
        boolean hasRowIntervalType = this.window instanceof TumblingGroupWindow ? AggregateUtil.hasRowIntervalType(((TumblingGroupWindow) this.window).size()) : this.window instanceof SlidingGroupWindow ? AggregateUtil.hasRowIntervalType(((SlidingGroupWindow) this.window).size()) : false;
        TableConfig tableConfig = plannerBase.getTableConfig();
        if (hasRowIntervalType && this.grouping.length > 0 && tableConfig.getMinIdleStateRetentionTime() < 0) {
            LOGGER.warn("No state retention interval configured for a query which accumulates state. Please provide a query configuration with valid retention interval to prevent excessive state size. You may specify a retention time of 0 to not clean up the state.");
        }
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        RowType rowType = (RowType) execEdge.getOutputType();
        RowType rowType2 = InternalTypeInfo.of(getOutputType()).toRowType();
        if (AggregateUtil.isRowtimeAttribute(this.window.timeAttribute())) {
            i = AggregateUtil.timeFieldIndex(FlinkTypeFactory.INSTANCE().buildRelNodeRowType(rowType), plannerBase.getRelBuilder(), this.window.timeAttribute());
            if (i < 0) {
                throw new TableException("Group window must defined on a time attribute, but the time attribute can't be found.\nThis should never happen. Please file an issue.");
            }
        } else {
            i = -1;
        }
        ZoneId shiftTimeZone = TimeWindowUtil.getShiftTimeZone(this.window.timeAttribute().getOutputDataType().getLogicalType(), tableConfig);
        Tuple2<WindowAssigner<?>, Trigger<?>> generateWindowAssignerAndTrigger = generateWindowAssignerAndTrigger();
        WindowAssigner<?> windowAssigner = (WindowAssigner) generateWindowAssignerAndTrigger.f0;
        Trigger<?> trigger = (Trigger) generateWindowAssignerAndTrigger.f1;
        Configuration mergedConfig = CommonPythonUtil.getMergedConfig(plannerBase.getExecEnv(), tableConfig);
        boolean anyMatch = Arrays.stream(this.aggCalls).anyMatch(aggregateCall -> {
            return PythonUtil.isPythonAggregate(aggregateCall, PythonFunctionKind.GENERAL);
        });
        WindowEmitStrategy apply = WindowEmitStrategy.apply(tableConfig, this.window);
        if (anyMatch) {
            boolean[] zArr = new boolean[this.aggCalls.length];
            Arrays.fill(zArr, this.needRetraction);
            createPandasPythonStreamWindowGroupOneInputTransformation = createGeneralPythonStreamWindowGroupOneInputTransformation(translateToPlan, rowType, rowType2, i, windowAssigner, AggregateUtil.transformToStreamAggregateInfoList(rowType, JavaScalaConversionUtil.toScala(Arrays.asList(this.aggCalls)), zArr, this.needRetraction, true, true), apply.getAllowLateness().longValue(), mergedConfig, shiftTimeZone);
        } else {
            createPandasPythonStreamWindowGroupOneInputTransformation = createPandasPythonStreamWindowGroupOneInputTransformation(translateToPlan, rowType, rowType2, i, windowAssigner, trigger, apply.getAllowLateness().longValue(), mergedConfig, shiftTimeZone);
        }
        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(mergedConfig)) {
            createPandasPythonStreamWindowGroupOneInputTransformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
        }
        RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(this.grouping, InternalTypeInfo.of(rowType));
        createPandasPythonStreamWindowGroupOneInputTransformation.setStateKeySelector(rowDataSelector);
        createPandasPythonStreamWindowGroupOneInputTransformation.setStateKeyType(rowDataSelector.getProducedType());
        return createPandasPythonStreamWindowGroupOneInputTransformation;
    }

    private Tuple2<WindowAssigner<?>, Trigger<?>> generateWindowAssignerAndTrigger() {
        TumblingWindowAssigner withGap;
        ProcessingTimeTriggers.AfterEndOfWindow afterEndOfWindow;
        if (this.window instanceof TumblingGroupWindow) {
            TumblingGroupWindow tumblingGroupWindow = (TumblingGroupWindow) this.window;
            FieldReferenceExpression timeField = tumblingGroupWindow.timeField();
            ValueLiteralExpression size = tumblingGroupWindow.size();
            if (AggregateUtil.isProctimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                withGap = TumblingWindowAssigner.of(AggregateUtil.toDuration(size)).withProcessingTime();
                afterEndOfWindow = ProcessingTimeTriggers.afterEndOfWindow();
            } else if (AggregateUtil.isRowtimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                withGap = TumblingWindowAssigner.of(AggregateUtil.toDuration(size)).withEventTime();
                afterEndOfWindow = EventTimeTriggers.afterEndOfWindow();
            } else {
                if (!AggregateUtil.isProctimeAttribute(timeField) || !AggregateUtil.hasRowIntervalType(size)) {
                    throw new UnsupportedOperationException("Event-time grouping windows on row intervals are currently not supported.");
                }
                withGap = CountTumblingWindowAssigner.of(AggregateUtil.toLong(size).longValue());
                afterEndOfWindow = ElementTriggers.count(AggregateUtil.toLong(size).longValue());
            }
        } else if (this.window instanceof SlidingGroupWindow) {
            SlidingGroupWindow slidingGroupWindow = (SlidingGroupWindow) this.window;
            FieldReferenceExpression timeField2 = slidingGroupWindow.timeField();
            ValueLiteralExpression size2 = slidingGroupWindow.size();
            ValueLiteralExpression slide = slidingGroupWindow.slide();
            if (AggregateUtil.isProctimeAttribute(timeField2) && AggregateUtil.hasTimeIntervalType(size2)) {
                withGap = SlidingWindowAssigner.of(AggregateUtil.toDuration(size2), AggregateUtil.toDuration(slide)).withProcessingTime();
                afterEndOfWindow = ProcessingTimeTriggers.afterEndOfWindow();
            } else if (AggregateUtil.isRowtimeAttribute(timeField2) && AggregateUtil.hasTimeIntervalType(size2)) {
                withGap = SlidingWindowAssigner.of(AggregateUtil.toDuration(size2), AggregateUtil.toDuration(slide));
                afterEndOfWindow = EventTimeTriggers.afterEndOfWindow();
            } else {
                if (!AggregateUtil.isProctimeAttribute(timeField2) || !AggregateUtil.hasRowIntervalType(size2)) {
                    throw new UnsupportedOperationException("Event-time grouping windows on row intervals are currently not supported.");
                }
                withGap = CountSlidingWindowAssigner.of(AggregateUtil.toLong(size2).longValue(), AggregateUtil.toLong(slide).longValue());
                afterEndOfWindow = ElementTriggers.count(AggregateUtil.toLong(size2).longValue());
            }
        } else {
            if (!(this.window instanceof SessionGroupWindow)) {
                throw new TableException("Unsupported window: " + this.window.toString());
            }
            SessionGroupWindow sessionGroupWindow = (SessionGroupWindow) this.window;
            FieldReferenceExpression timeField3 = sessionGroupWindow.timeField();
            ValueLiteralExpression gap = sessionGroupWindow.gap();
            if (AggregateUtil.isProctimeAttribute(timeField3)) {
                withGap = SessionWindowAssigner.withGap(AggregateUtil.toDuration(gap));
                afterEndOfWindow = ProcessingTimeTriggers.afterEndOfWindow();
            } else {
                if (!AggregateUtil.isRowtimeAttribute(timeField3)) {
                    throw new UnsupportedOperationException("This should not happen.");
                }
                withGap = SessionWindowAssigner.withGap(AggregateUtil.toDuration(gap));
                afterEndOfWindow = EventTimeTriggers.afterEndOfWindow();
            }
        }
        return Tuple2.of(withGap, afterEndOfWindow);
    }

    private OneInputTransformation<RowData, RowData> createPandasPythonStreamWindowGroupOneInputTransformation(Transformation<RowData> transformation, RowType rowType, RowType rowType2, int i, WindowAssigner<?> windowAssigner, Trigger<?> trigger, long j, Configuration configuration, ZoneId zoneId) {
        Tuple2<int[], PythonFunctionInfo[]> extractPythonAggregateFunctionInfosFromAggregateCall = CommonPythonUtil.extractPythonAggregateFunctionInfosFromAggregateCall(this.aggCalls);
        return new OneInputTransformation<>(transformation, getDescription(), getPandasPythonStreamGroupWindowAggregateFunctionOperator(configuration, rowType, rowType2, windowAssigner, trigger, j, i, (int[]) extractPythonAggregateFunctionInfosFromAggregateCall.f0, (PythonFunctionInfo[]) extractPythonAggregateFunctionInfosFromAggregateCall.f1, zoneId), InternalTypeInfo.of(rowType2), transformation.getParallelism());
    }

    private OneInputTransformation<RowData, RowData> createGeneralPythonStreamWindowGroupOneInputTransformation(Transformation<RowData> transformation, RowType rowType, RowType rowType2, int i, WindowAssigner<?> windowAssigner, AggregateInfoList aggregateInfoList, long j, Configuration configuration, ZoneId zoneId) {
        int indexOfCountStar = aggregateInfoList.getIndexOfCountStar();
        boolean countStarInserted = aggregateInfoList.countStarInserted();
        Tuple2<PythonAggregateFunctionInfo[], DataViewUtils.DataViewSpec[][]> extractPythonAggregateFunctionInfos = CommonPythonUtil.extractPythonAggregateFunctionInfos(aggregateInfoList, this.aggCalls);
        return new OneInputTransformation<>(transformation, getDescription(), getGeneralPythonStreamGroupWindowAggregateFunctionOperator(configuration, rowType, rowType2, windowAssigner, (PythonAggregateFunctionInfo[]) extractPythonAggregateFunctionInfos.f0, (DataViewUtils.DataViewSpec[][]) extractPythonAggregateFunctionInfos.f1, i, indexOfCountStar, this.generateUpdateBefore, countStarInserted, j, zoneId), InternalTypeInfo.of(rowType2), transformation.getParallelism());
    }

    private OneInputStreamOperator<RowData, RowData> getPandasPythonStreamGroupWindowAggregateFunctionOperator(Configuration configuration, RowType rowType, RowType rowType2, WindowAssigner<?> windowAssigner, Trigger<?> trigger, long j, int i, int[] iArr, PythonFunctionInfo[] pythonFunctionInfoArr, ZoneId zoneId) {
        try {
            return (OneInputStreamOperator) CommonPythonUtil.loadClass(ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME).getConstructor(Configuration.class, PythonFunctionInfo[].class, RowType.class, RowType.class, Integer.TYPE, WindowAssigner.class, Trigger.class, Long.TYPE, PlannerNamedWindowProperty[].class, int[].class, int[].class, ZoneId.class).newInstance(configuration, pythonFunctionInfoArr, rowType, rowType2, Integer.valueOf(i), windowAssigner, trigger, Long.valueOf(j), this.namedWindowProperties, this.grouping, iArr, zoneId);
        } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new TableException("Python StreamArrowPythonGroupWindowAggregateFunctionOperator constructed failed.", e);
        }
    }

    private OneInputStreamOperator<RowData, RowData> getGeneralPythonStreamGroupWindowAggregateFunctionOperator(Configuration configuration, RowType rowType, RowType rowType2, WindowAssigner<?> windowAssigner, PythonAggregateFunctionInfo[] pythonAggregateFunctionInfoArr, DataViewUtils.DataViewSpec[][] dataViewSpecArr, int i, int i2, boolean z, boolean z2, long j, ZoneId zoneId) {
        try {
            return (OneInputStreamOperator) CommonPythonUtil.loadClass(GENERAL_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME).getConstructor(Configuration.class, RowType.class, RowType.class, PythonAggregateFunctionInfo[].class, DataViewUtils.DataViewSpec[][].class, int[].class, Integer.TYPE, Boolean.TYPE, Boolean.TYPE, Integer.TYPE, WindowAssigner.class, LogicalWindow.class, Long.TYPE, PlannerNamedWindowProperty[].class, ZoneId.class).newInstance(configuration, rowType, rowType2, pythonAggregateFunctionInfoArr, dataViewSpecArr, this.grouping, Integer.valueOf(i2), Boolean.valueOf(z), Boolean.valueOf(z2), Integer.valueOf(i), windowAssigner, this.window, Long.valueOf(j), this.namedWindowProperties, zoneId);
        } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new TableException("Python PythonStreamGroupWindowAggregateOperator constructed failed.", e);
        }
    }
}
