Support serializing external OpenSearch UDFs at pushdown time #4618
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Previously, externally registered OpenSearch UDFs can not be serialized as they are not registered in
RelJsonSerializer. This PR collects these UDFs in aSqlOperatorTableand registers it inRelJsonSerializer.Blocker
Blocked by #2813 (or potentially other issues that restrict groupping by struct fields)
UDF is serialized, but grouping by a generated struct seems to be problematic after pushdown.
source=weblogs | where host='1.2.3.4' | eval info = geoip('my-datasource', host) | stats count() by info:{ "error": { "reason": "There was internal problem at backend", "details": "java.sql.SQLException: exception while executing query: class java.lang.String cannot be cast to class java.util.Map (java.lang.String and java.util.Map are in module java.base of loader 'bootstrap')", "type": "RuntimeException" }, "status": 500 }In this case, the result map is converted to a string when used as a group key.
Directly running the DSL with the script gives the bucket key as string
Query:
{"from":0,"size":0,"timeout":"1m","query":{"term":{"host":{"value":"1.2.3.4","boost":1.0}}},"_source":{"includes":["host"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"info":{"terms":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXNyABFqYXZhLnV0aWwuQ29sbFNlcleOq7Y6G6gRAwABSQADdGFneHAAAAADdwQAAAAGdAAHcm93VHlwZXQAknsKICAiZmllbGRzIjogWwogICAgewogICAgICAidWR0IjogIkVYUFJfSVAiLAogICAgICAidHlwZSI6ICJPVEhFUiIsCiAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICJuYW1lIjogImhvc3QiCiAgICB9CiAgXSwKICAibnVsbGFibGUiOiBmYWxzZQp9dAAEZXhwcnQCw3sKICAib3AiOiB7CiAgICAibmFtZSI6ICJHRU9JUCIsCiAgICAia2luZCI6ICJPVEhFUl9GVU5DVElPTiIsCiAgICAic3ludGF4IjogIkZVTkNUSU9OIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAibGl0ZXJhbCI6ICJteS1kYXRhc291cmNlIiwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgIm51bGxhYmxlIjogZmFsc2UsCiAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgIH0KICAgIH0sCiAgICB7CiAgICAgICJpbnB1dCI6IDAsCiAgICAgICJuYW1lIjogIiQwIgogICAgfQogIF0sCiAgImNsYXNzIjogIm9yZy5vcGVuc2VhcmNoLnNxbC5leHByZXNzaW9uLmZ1bmN0aW9uLlVzZXJEZWZpbmVkRnVuY3Rpb25CdWlsZGVyJDEiLAogICJ0eXBlIjogewogICAgInR5cGUiOiAiTUFQIiwKICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgImtleSI6IHsKICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAicHJlY2lzaW9uIjogLTEKICAgIH0sCiAgICAidmFsdWUiOiB7CiAgICAgICJ0eXBlIjogIkFOWSIsCiAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAicHJlY2lzaW9uIjogLTEsCiAgICAgICJzY2FsZSI6IC0yMTQ3NDgzNjQ4CiAgICB9CiAgfSwKICAiZGV0ZXJtaW5pc3RpYyI6IHRydWUsCiAgImR5bmFtaWMiOiBmYWxzZQp9dAAKZmllbGRUeXBlc3NyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAABdAAEaG9zdH5yAClvcmcub3BlbnNlYXJjaC5zcWwuZGF0YS50eXBlLkV4cHJDb3JlVHlwZQAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQAAklQeHg=\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp":1761646601217183000}},"missing_bucket":true,"missing_order":"first","order":"asc"}}}]}}}}Result:
{ "took": 137, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": null, "hits": [] }, "aggregations": { "composite_buckets": { "after_key": { "info": "{continent_name=Oceania, country_iso_code=AU, country_name=Australia, location=-33.4940,143.2104, time_zone=Australia/Sydney}" }, "buckets": [ { "key": { "info": "{continent_name=Oceania, country_iso_code=AU, country_name=Australia, location=-33.4940,143.2104, time_zone=Australia/Sydney}" }, "doc_count": 1 } ] } } }For reference, the result of `source=weblogs | where host='1.2.3.4' | eval info = geoip('my-datasource', host)` is:
{ "schema": [ { "name": "host", "type": "ip" }, { "name": "method", "type": "string" }, { "name": "bytes", "type": "string" }, { "name": "response", "type": "string" }, { "name": "url", "type": "string" }, { "name": "info", "type": "struct" } ], "datarows": [ [ "1.2.3.4", "GET", "1234", "200", "/history/voyager1/", { "continent_name": "Oceania", "country_iso_code": "AU", "country_name": "Australia", "location": "-33.4940,143.2104", "time_zone": "Australia/Sydney" } ] ], "total": 1, "size": 1 }Workaround
Instead of directly groupping by the struct generated by
geoip, I created tests that group results by sub-fields of result struct ofgeoipfunction. E.g. if the result objectinfo{"str": "a string", "num": 1}, I group results byinfo.str.Related Issues
Resolves #4478
Check List
--signoffor-s.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.