Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/engine/internal/executor/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,13 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
case *array.StringBuilder:
destinationFieldBuilder := builder.Field(duplicate.destinationIdx).(*array.StringBuilder)
for i := range int(batch.NumRows()) {
if (col.IsNull(i) || !col.IsValid(i)) || (collisionCol.IsNull(i) || !collisionCol.IsValid(i)) {
if col.IsNull(i) || !col.IsValid(i) {
sourceFieldBuilder.AppendNull() // append NULL to original column
destinationFieldBuilder.AppendNull() // append NULL to _extraced column
destinationFieldBuilder.AppendNull() // append NULL to _extracted column
} else if collisionCol.IsNull(i) || !collisionCol.IsValid(i) {
v := col.(*array.String).Value(i)
sourceFieldBuilder.Append(v) // append value to original column
destinationFieldBuilder.AppendNull() // append NULL to _extracted column
} else {
sourceFieldBuilder.AppendNull() // append NULL to original column
v := col.(*array.String).Value(i)
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/internal/executor/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
}, nil),
expectedRows: []arrowtest.Rows{
{
{"utf8.label.status": nil, "utf8.metadata.status": nil, "utf8.metadata.status_extracted": nil}, // null collision -> null extraction
{"utf8.label.status": nil, "utf8.metadata.status": "200", "utf8.metadata.status_extracted": nil}, // null collision -> null extraction
{"utf8.label.status": "active", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": nil}, // null source -> null extraction
{"utf8.label.status": nil, "utf8.metadata.status": nil, "utf8.metadata.status_extracted": nil}, // both null -> null extraction
{"utf8.label.status": "active", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "200"}, // both non-null -> extract
Expand Down
24 changes: 18 additions & 6 deletions pkg/engine/internal/planner/physical/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable, ctx *Context) ([]Node,
Destination: types.ColumnTypeMetadata,
Collision: types.ColumnTypeLabel,
}
node, err = p.wrapWithCompatibility(node, compat)
node, err = p.wrapNodeWith(node, compat)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -371,7 +371,7 @@ func (p *Planner) processVectorAggregation(lp *logical.VectorAggregation, ctx *C
// Convert [logical.Parse] into one [ParseNode] node.
// A ParseNode initially has an empty list of RequestedKeys which will be populated during optimization.
func (p *Planner) processParse(lp *logical.Parse, ctx *Context) ([]Node, error) {
node := &ParseNode{
var node Node = &ParseNode{
Kind: convertParserKind(lp.Kind),
}
p.plan.graph.Add(node)
Expand All @@ -387,15 +387,27 @@ func (p *Planner) processParse(lp *logical.Parse, ctx *Context) ([]Node, error)
}
}

if p.context.v1Compatible {
compat := &ColumnCompat{
Source: types.ColumnTypeParsed,
Destination: types.ColumnTypeParsed,
Collision: types.ColumnTypeLabel,
}
node, err = p.wrapNodeWith(node, compat)
if err != nil {
return nil, err
}
}

return []Node{node}, nil
}

func (p *Planner) wrapWithCompatibility(node Node, compat *ColumnCompat) (Node, error) {
p.plan.graph.Add(compat)
if err := p.plan.graph.AddEdge(dag.Edge[Node]{Parent: compat, Child: node}); err != nil {
func (p *Planner) wrapNodeWith(node Node, wrapper Node) (Node, error) {
p.plan.graph.Add(wrapper)
if err := p.plan.graph.AddEdge(dag.Edge[Node]{Parent: wrapper, Child: node}); err != nil {
return nil, err
}
return compat, nil
return wrapper, nil
}

// Optimize runs optimization passes over the plan, modifying it
Expand Down
9 changes: 8 additions & 1 deletion pkg/engine/internal/planner/physical/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,15 @@ func TestPlanner_Convert_WithParse(t *testing.T) {
children := physicalPlan.Children(filterNode)
require.Len(t, children, 1)

compatNode, ok := children[0].(*ColumnCompat)
require.True(t, ok, "Filter's child should be ColumnCompat")
require.Equal(t, types.ColumnTypeParsed, compatNode.Source)

children = physicalPlan.Children(compatNode)
require.Len(t, children, 1)

parseNode, ok := children[0].(*ParseNode)
require.True(t, ok, "Filter's child should be ParseNode")
require.True(t, ok, "ColumnCompat's child should be ParseNode")
require.Equal(t, ParserLogfmt, parseNode.Kind)
require.Empty(t, parseNode.RequestedKeys)

Expand Down
10 changes: 10 additions & 0 deletions pkg/logql/bench/faker.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,11 @@ var defaultApplications = []Application{
baseLogfmt += fmt.Sprintf(` error="failed to process request: %s"`, f.ErrorMessage())
}

// Add colliding field for some logs
if f.rnd.Float32() < 0.25 {
baseLogfmt += fmt.Sprintf(` service_name=loki-ingester-%d`, f.rnd.Intn(10))
}

return baseLogfmt
},
OTELResource: map[string]string{
Expand Down Expand Up @@ -820,6 +825,11 @@ var defaultApplications = []Application{
baseLogfmt += fmt.Sprintf(` error="failed to process trace: %s"`, f.ErrorMessage())
}

// Add colliding field for some logs
if f.rnd.Float32() < 0.25 {
baseLogfmt += fmt.Sprintf(` service_name=tempo-distributor-%d`, f.rnd.Intn(10))
}

return baseLogfmt
},
OTELResource: map[string]string{
Expand Down