Skip to content

Conversation

lihaosky
Copy link
Contributor

@lihaosky lihaosky commented Aug 18, 2025

What is the purpose of the change

  • Add model argument support in ptf
  • Add ml_predict builtin ptf definition

Brief change log

  • Add ModelSemantics in CallContext
  • Add model in StaticArgument
  • Add disableSystemArguments to control whether add uid and rowtime in ptf input/output
  • Add ml_predict builtin ptf
  • Add and update tests

Verifying this change

  • Add unit test for ml_predict type inference
  • Updated test for ml_predict function

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 18, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@@ -147,7 +159,9 @@ private static void checkScalarArgsOnly(List<StaticArgument> defaultArgs) {
checkPassThroughColumns(declaredArgs);

final List<StaticArgument> newStaticArgs = new ArrayList<>(declaredArgs);
newStaticArgs.addAll(PROCESS_TABLE_FUNCTION_SYSTEM_ARGS);
Copy link
Contributor

@davidradl davidradl Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious what the system arguments mean. Is this something that the user needs to be aware of? I do not see this phrase in the Flip and there is no more information in the Jira. I suggest including a description and motivation behind this piece. It appears to be a type of static arg that will be added if the boolean flag is on, but I am not sure when this would/should be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to control whether uid, ontime field will be added to ptf input. This is currently used by ml_predict because it doesn't need uid and ontime field. It's not exposed to PTF function user can define. Yes. I can add more description if this approach makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This in not entirely correct. A user-defined PTF can implement a TypeInference and avoid system args, but this is kind of second-level API.

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 27, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Sep 2, 2025
Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job @lihaosky! I added some comments to improve the contribution a bit but nothing major.

@@ -651,6 +652,12 @@ public Optional<TableSemantics> getTableSemantics(int pos) {
return Optional.of(semantics);
}

@Override
public Optional<ModelSemantics> getModelSemantics(int pos) {
// TODO: Add ModelReferenceExpression checks and TableApiModelSemantics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not leave TODO in the code base, sometimes they stay there forever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@@ -298,6 +298,11 @@ public Builder staticArguments(StaticArgument... staticArguments) {
return this;
}

public Builder allowSystemArguments(boolean allowSystemArguments) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's call this disableSystemArguments and in TypeInference. By default, this then can be false.

public interface ModelSemantics {

/**
* Input data type expected by the passed model. Extracting type from PTF class definition is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Input data type expected by the passed model. Extracting type from PTF class definition is
* Input data type expected by the passed model.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop last sentence. It rather confuses.

DataType inputDataType();

/**
* Output data type produced by the passed model. Extracting type from PTF class definition is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Output data type produced by the passed model. Extracting type from PTF class definition is
* Output data type produced by the passed model.

if (tableSemantics == null) {
if (throwOnFailure) {
throw new ValidationException(
"First argument must be a table for ML_PREDICT function.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An input type strategy is optional if static arguments have been declared. You can assume that this check has been done already. An input type strategy might only be useful if you want to do additional validation, like validateTableAndDescriptorArguments below

import org.apache.flink.table.types.DataType;

/** Mock implementation of {@link ModelSemantics} for testing purposes. */
public class ModelSemanticsMock implements ModelSemantics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move next to CallContextMock into utils package

@@ -62,6 +65,7 @@ public final class CallBindingCallContext extends AbstractSqlCallContext {
private final List<DataType> argumentDataTypes;
private final @Nullable DataType outputType;
private final @Nullable List<StaticArgument> staticArguments;
private final SqlValidator validator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validator does not really fit in here, can we avoid it? SqlModelCall should have resolved types already. I think TableArgCall has the same? we should synchronize the two if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored

@@ -1327,7 +1326,7 @@ public List<SqlGroupedWindowFunction> getAuxiliaryFunctions() {
public static final SqlFunction SESSION = new SqlSessionTableFunction();

// MODEL TABLE FUNCTIONS
public static final SqlFunction ML_PREDICT = new SqlMLPredictTableFunction();
// public static final SqlFunction ML_PREDICT = new SqlMLPredictTableFunction();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

@@ -335,6 +335,8 @@ public RelNode visit(FunctionQueryOperation functionTable) {
inputStack.add(relBuilder.build());
return tableArgCall;
}
// TODO: Check ModelReferenceExpression and construct
// RexModelArgCall
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to fix this TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

util.addTemporarySystemFunction("f", NoSystemArgsTableFunction.class);
assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1);"))
.satisfies(
anyCauseMatches("Disabling uid/time attributes is not supported for PTF."));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
anyCauseMatches("Disabling uid/time attributes is not supported for PTF."));
anyCauseMatches("Disabling system arguments is not supported for user-defined PTF yet."));

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Sep 3, 2025
@github-actions github-actions bot removed the community-reviewed PR has been reviewed by the community. label Sep 4, 2025
@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Sep 4, 2025
Comment on lines +234 to +262
if (tableSemantics == null) {
if (throwOnFailure) {
throw new ValidationException(
"First argument must be a table for ML_PREDICT function.");
} else {
return Optional.empty();
}
}

// Check that second argument is a model
ModelSemantics modelSemantics = callContext.getModelSemantics(1).orElse(null);
if (modelSemantics == null) {
if (throwOnFailure) {
throw new ValidationException(
"Second argument must be a model for ML_PREDICT function.");
} else {
return Optional.empty();
}
}

// Check that third argument is a descriptor with column names
Optional<ColumnList> descriptorColumns = callContext.getArgumentValue(2, ColumnList.class);
if (descriptorColumns.isEmpty()) {
if (throwOnFailure) {
throw new ValidationException(
"Third argument must be a descriptor with simple column names for ML_PREDICT function.");
} else {
return Optional.empty();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@twalthr , I added these checks back since I have tests in MLPredictInputTypeStrategyTest testing invalid argument as well. Also tableSemantics etc are needed below. Maybe doesn't hurt to do extra check and give meaningful error message.

@lihaosky
Copy link
Contributor Author

lihaosky commented Sep 4, 2025

@flinkbot run azure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants