-
Notifications
You must be signed in to change notification settings - Fork 3.8k
chore(engine): Add "compatibility node" to physical plan to adhere with naming of "colliding labels" in v1 engine #19470
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
bda3a3b
to
353a588
Compare
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
The `ColumnCompat` node is used to guarantee compatibility with the old engine. In the new engine it is possible to have the same column name but from different sources, such as labels or structured metadata. While the old engine suffixes colliding names with a `_extracted`, the new engine returns them in both the labels and structured metadata response. This new node is used to keep the old behaviour without implementing the logic directly into the engine, but having it separate, so it can easliy be disabled again. Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
0d27b2a
to
bddd381
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
schema := batch.Schema() | ||
for idx := range schema.NumFields() { | ||
ident, err := semconv.ParseFQN(schema.Field(idx).Name) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
release batch here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we return a new record in the happy path, can we defer batch.Release()
on 22 instead? I really prefer the open/close pattern, and using defer to close right after we open in the code.
Signed-off-by: Christian Haudum <[email protected]>
Signed-off-by: Christian Haudum <[email protected]>
slices.Reverse(groups) | ||
} | ||
|
||
// TODO(chaudum): Make it configurable to keep/remove this compatibility node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it correct for compatibility to happen this early in the pipeline? Don't we need it after parse stages and the like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one always needs to happen, and needs to be before any filter node, since an ambiguous label filter does the evaluation on the COALESCE of metadata and label columns.
In case of a parse, we need a second one, that is placed directly after parse.
slices.Reverse(groups) | ||
} | ||
|
||
// TODO(chaudum): Make it configurable to keep/remove this compatibility node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a compatibility node in the logical plan that is responsible for this? We want compatibility based on how LogQL is being used, so it does feel like the logical plan's responsibility for placing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, haven't thought about that, but you have a point here.
Could be simply an Operator COMPAT, wdyt?
%1 = EQ label.env "prod"
%2 = MAKETABLE [selector=%1, predicates=[], shard=0_of_1]
...
%8 = LIMIT %7 [skip=0, fetch=1000]
%9 = COMPAT %8
RETURN %9
Would it be ok to add this in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
COMPAT makes sense to me 👍 Maybe even LOGQL_COMPAT if we want to be extra verbose.
Sure, I'm fine with it being done in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not much to add here, but I like LOGQL_COMPAT
fwiw
|
||
// TODO(chaudum): Make it configurable to keep/remove this compatibility node | ||
compat := &ColumnCompat{ | ||
id: "MetadataOverLabel", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ID won't be unique for very long; supporting binary operations over vectors like sum(...) + sum(...)
is likely to end up with two distinct MAKE_TABLE operations in the logical plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ID here is not relevant at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be relevant very soon :) The scheduler will need unique IDs.
}) | ||
} | ||
|
||
// Create a new builder with the updated schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we need to create a new builder here since the contents of the arrays don't change for compatibility mapping.
Can we use the existing arrays and provide a new schema which handles the renaming via array.NewRecord? That would also be much faster than copying the data in the arrays.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I do with the columns that do not have conflicts.
However, I cannot just rename the full column of a batch, since we need to do that on a row basis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I'm following yet. You can create a new *arrow.Schema where the field has the new resolved name, but give it the same underlying array (via array.NewRecord
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I second this, there should be a way to reuse the column data and just rename the field in the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 147 creates the new batch using array.NewRecord(newSchema, newSchemaColumns, batch.NumRows())
, where newSchemaColums
is the []arrow.Array
that holds the existing unmodified columns (line 111) and the modified columns (line 137, line 141).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm having a hard time following from the code why we need to modify the columns? It reads as if it were just copying data, but I guess you're saying it's doing more than that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After talking offline, it's clearer to me now that this is similar to a multi-in, multi-out coalesce operation.
I do find the logic here hard to follow and validate: it seems like sourceFieldBuilder never has a non-NULL appended to it, but I don't think that's true? I don't have any suggestions how to make this easier to understand, and I don't want to block us having this, so I'm comfortable with it being merged and coming back to this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the need for the new column was finally made clear to me due to the fact that they columns won't always conflict (either by rows in the same batch or across batches). I think this case is worth a test, at least for documentation purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These cases are covered by tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do find the logic here hard to follow and validate: it seems like sourceFieldBuilder never has a non-NULL appended to it, but I don't think that's true?
Right, that is incorrect behaviour and will be fixed with
5b16cfb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not much to add, just a nit about releasing semantics and agreement with everyone else :)
looks good though!
schema := batch.Schema() | ||
for idx := range schema.NumFields() { | ||
ident, err := semconv.ParseFQN(schema.Field(idx).Name) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we return a new record in the happy path, can we defer batch.Release()
on 22 instead? I really prefer the open/close pattern, and using defer to close right after we open in the code.
|
||
// Return early if there are no colliding column names. | ||
if len(duplicates) == 0 { | ||
return successState(batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh, I was wrong, we don't always create a new record. I wonder if we want to for cleaner release semantics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could still defer release on line 22, and explicitly retain here, as this is a short circuit.
name: "multiple duplicates", | ||
slice1: []string{"a", "b", "c", "d"}, | ||
slice2: []string{"c", "d", "e", "f"}, | ||
expected: []duplicate{ | ||
{ | ||
value: "c", | ||
s1Idx: 2, | ||
s2Idx: 0, | ||
}, | ||
{ | ||
value: "d", | ||
s1Idx: 3, | ||
s2Idx: 1, | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "duplicate with different positions", | ||
slice1: []string{"x", "y", "z"}, | ||
slice2: []string{"z", "y", "x"}, | ||
expected: []duplicate{ | ||
{ | ||
value: "x", | ||
s1Idx: 0, | ||
s2Idx: 2, | ||
}, | ||
{ | ||
value: "y", | ||
s1Idx: 1, | ||
s2Idx: 1, | ||
}, | ||
{ | ||
value: "z", | ||
s1Idx: 2, | ||
s2Idx: 0, | ||
}, | ||
}, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these two cases are likely overlapping in the actual code they test.
slices.Reverse(groups) | ||
} | ||
|
||
// TODO(chaudum): Make it configurable to keep/remove this compatibility node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not much to add here, but I like LOGQL_COMPAT
fwiw
}) | ||
} | ||
|
||
// Create a new builder with the updated schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I second this, there should be a way to reuse the column data and just rename the field in the schema.
After parsing log lines, field names need to be checked whether they collide with label field names. Follow up on #19470
After parsing log lines, field names need to be checked whether they collide with label field names. Follow up on #19470
After parsing log lines, field names need to be checked whether they collide with label field names. Follow up on #19470
The v1 engine has a mechanism to rename labels in case they have the same name but different origin, such as labels, structured metadata, or parsed fields. 1. In case a log line has a structured metadata key with the same name as the label name of the stream, than the metadata key is suffixed with `_extracted`, such as `service_extracted`, if `service` exists in both `labels` and `metadata`. 2. In case a parser creates a parsed field with the same as the label name of the stream, then the parsed key is suffixed with `_extracted` in the same way as case 1. However, if the field name also collides with a structured metadata key, then the extracted structured metadata is replaced with the extracted parsed field. This PR only implements the second case. This PR is a follow up on #19470 Signed-off-by: Christian Haudum <[email protected]> Co-authored-by: Ivan Kalita <[email protected]>
Summary
The v1 engine has a mechanism to rename labels in case they have the same name but different origin, such as labels, structured metadata, or parsed fields.
_extracted
, such asservice_extracted
, ifservice
exists in bothlabels
andmetadata
._extracted
in the same way as case 1. However, if the field name also collides with a structured metadata key, then the extracted structured metadata is replaced with the extracted parsed field.This PR only implements the first case. As a follow up PR, the second case needs to be implemented as well. Additionally, the newly introduced "compatibility node" should also be made optional with a feature flag and/or per-request.