-
Notifications
You must be signed in to change notification settings - Fork 20
Add ability to ignore unknown fields in kafka messages #88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add ability to ignore unknown fields in kafka messages #88
Conversation
@RyanSkraba What is your opinion of this PR. I think that it is useful and looks like it will work correctly, but... It makes changes to existing public APIs, though perhaps they should not be considered public APIs as they are intended to be used internally. What are your thoughts. @SamoylovMD You said you have a fork of this project that you are trying to resolve by adding the functionality to this repository. A tack that I appreciated. My question to you is how much damage would the changes here be for you? @podgaietska Could you, rather than change the constructors, deprecated the current constructors and add a constructor that accepts the BigQuerySinkConfig? This way, in the future when new options are added we do not have to change all the methods again. |
@Claudenw I'm fine if the behaviour is configurable. In my use case I have an SMT which decouples source data schema from target by explicit construction of output records in a way I need, and there is a separate job which updates target table schema according to this SMT. The new option seems reasonable but in my case it effectively works the same way without any tweaks in connector plugin code. I think the new option won't affect me, thanks for letting me know anyway. |
TASKS_MAX, | ||
"Tasks should have failed when writing record with fields not present" | ||
+ "in target table." ); | ||
} @Test public void testRecordWithUnknownFieldWhenIgnoreEnabled() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some odd formatting here! Can you fix this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Fixed. Thanks!
For Kafka connectors, it's vitally important to keep the "config compatibility" (if that's even a thing)! It's tempting to ignore internal source or binary compatibility because most users will be dropping in the entire connector to a Kafka Connect cluster, not accessing the jar otherwise... I don't have a strong opinion on source or binary API compatibility internally to the connector. But your suggestion is pretty easy: why not add more constructors instead of modifying the existing ones just in case? Outside of that, the changes LGTM. I don't think there's any other places (than the JsonWriter and InsertAllRequest) that need to be configured. Thanks for the tests! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deprecated tags need to come at the end of the javadoc. if you run mvn compile
you will see all the lines that need to be changed.
Note:
When I wrote about passing the config object I was intending that calls to
config.getBoolean(BigQuerySinkConfig.IGNORE_UNKNOWN_FIELDS_CONFIG);
would be contained within the config class itself as something like:
public boolean ignoreUnknownFields() {
return getBoolean(BigQuerySinkConfig.IGNORE_UNKNOWN_FIELDS_CONFIG);
}
This helps enforce the concept of information hiding, where the calling code does not need to know how we figure out if unknown fields are ignored, just whether or not we do.
calling code then looks like
this.ignoreUnknownFields = config.ignoreUnknownFields();
However, since the rest of the code base uses the same style as your solution, I will approve this once the tests pass.
Also, some developers and projects will state that ignoreUnknownFields()
should be 'isIgnoreUnknownFields(). I can go either way on this argument, sometimes
is` seems to make more sense sometimes not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good except that it does not compile when rebased on main.
It looks like there were changes in the main branch that removed a class you need. please rebase onto main and add the import statement you need.
847fa6a
to
f8b0a36
Compare
).define( | ||
IGNORE_UNKNOWN_FIELDS_CONFIG, | ||
IGNORE_UNKNOWN_FIELDS_TYPE, | ||
IGNORE_UNKNOWN_FIELDS_DEFAULT, | ||
IGNORE_UNKNOWN_FIELDS_IMPORTANCE, | ||
IGNORE_UNKNOWN_FIELDS_DOC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starting in v3.9.0 we are using ExtendedConfigKey
s for new definitions. Please use the ExtendedConfigKey.builder(IGNORE_UNKNOWN_FIELDS_CONFIG)
to get a builder and set the values in the builder. In addition please add builder.since("2.10.0")
to indicate that this options was added in version 2.10.0, this will update the configuration documentation when the change is merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Thanks!
f8b0a36
to
e345758
Compare
e345758
to
d777f7b
Compare
Summary
This change introduces a new connector configuration option to control how unknown fields in Kafka records are handled when writing to BigQuery.
Details