Skip to content

Commit 894b194

Browse files
committed
feat: add logical projection node
1 parent b4b3b0d commit 894b194

File tree

18 files changed

+439
-429
lines changed

18 files changed

+439
-429
lines changed

pkg/engine/internal/executor/unwrap.go renamed to pkg/engine/internal/executor/cast.go

Lines changed: 31 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -10,34 +10,41 @@ import (
1010
"github.com/apache/arrow-go/v18/arrow/memory"
1111
"github.com/dustin/go-humanize"
1212

13+
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
1314
"github.com/grafana/loki/v3/pkg/engine/internal/types"
1415
)
1516

16-
func unwrapFn(operation types.UnaryOp) UnaryFunction {
17-
//TODO: change signature of Evaluate to add this alloc parameter
18-
alloc := memory.DefaultAllocator
19-
return UnaryFunc(func(input ColumnVector) (ColumnVector, error) {
20-
sourceCol, ok := input.ToArray().(*array.String)
17+
func castFn(operation types.UnaryOp) UnaryFunction {
18+
return UnaryFunc(func(input ColumnVector, allocator memory.Allocator) (ColumnVector, error) {
19+
arr := input.ToArray()
20+
defer arr.Release()
21+
22+
sourceCol, ok := arr.(*array.String)
2123
if !ok {
22-
return nil, fmt.Errorf("expected column to be of type string, got %T", input.ToArray())
24+
return nil, fmt.Errorf("expected column to be of type string, got %T", arr)
2325
}
2426

2527
// Get conversion function and process values
2628
conversionFn := getConversionFunction(operation)
27-
unwrappedCol, errTracker := convertValues(sourceCol, conversionFn, alloc)
29+
castCol, errTracker := castValues(sourceCol, conversionFn, allocator)
30+
defer castCol.Release()
2831

2932
// Build error columns if needed
3033
errorCol, errorDetailsCol := errTracker.buildArrays()
3134
defer errTracker.releaseBuilders()
35+
if errTracker.hasErrors {
36+
defer errorCol.Release()
37+
defer errorDetailsCol.Release()
38+
}
3239

3340
// Build output schema and record
3441
fields := buildOutputFields(errTracker.hasErrors)
35-
arr, err := buildResult(unwrappedCol, errorCol, errorDetailsCol, fields)
42+
result, err := buildResult(castCol, errorCol, errorDetailsCol, fields)
3643
if err != nil {
3744
return nil, err
3845
}
3946
return &ArrayStruct{
40-
array: arr,
47+
array: result,
4148
ct: types.ColumnTypeGenerated,
4249
rows: input.Len(),
4350
}, nil
@@ -48,43 +55,43 @@ type conversionFn func(value string) (float64, error)
4855

4956
func getConversionFunction(operation types.UnaryOp) conversionFn {
5057
switch operation {
51-
case types.UnaryOpUnwrapBytes:
58+
case types.UnaryOpCastBytes:
5259
return convertBytes
53-
case types.UnaryOpUnwrapDuration:
60+
case types.UnaryOpCastDuration:
5461
return convertDuration
5562
default:
5663
return convertFloat
5764
}
5865
}
5966

60-
func convertValues(
67+
func castValues(
6168
sourceCol *array.String,
6269
conversionFn conversionFn,
6370
allocator memory.Allocator,
6471
) (arrow.Array, *errorTracker) {
65-
unwrappedBuilder := array.NewFloat64Builder(allocator)
66-
defer unwrappedBuilder.Release()
72+
castBuilder := array.NewFloat64Builder(allocator)
73+
defer castBuilder.Release()
6774

6875
tracker := newErrorTracker(allocator)
6976

7077
for i := 0; i < sourceCol.Len(); i++ {
7178
if sourceCol.IsNull(i) {
72-
unwrappedBuilder.AppendNull()
79+
castBuilder.AppendNull()
7380
tracker.recordSuccess()
7481
} else {
7582
valueStr := sourceCol.Value(i)
7683
if val, err := conversionFn(valueStr); err == nil {
77-
unwrappedBuilder.Append(val)
84+
castBuilder.Append(val)
7885
tracker.recordSuccess()
7986
} else {
8087
// Use 0.0 as default for errors, for backwards compatibility with old engine
81-
unwrappedBuilder.Append(0.0)
88+
castBuilder.Append(0.0)
8289
tracker.recordError(i, err)
8390
}
8491
}
8592
}
8693

87-
return unwrappedBuilder.NewArray(), tracker
94+
return castBuilder.NewArray(), tracker
8895
}
8996

9097
func buildOutputFields(
@@ -93,45 +100,21 @@ func buildOutputFields(
93100
fields := make([]arrow.Field, 0, 3)
94101

95102
// Add value field
96-
fields = append(fields, arrow.Field{
97-
Name: types.ColumnNameGeneratedValue,
98-
Type: arrow.PrimitiveTypes.Float64,
99-
Metadata: types.ColumnMetadata(
100-
types.ColumnTypeGenerated,
101-
types.Loki.Float,
102-
),
103-
Nullable: true,
104-
})
103+
fields = append(fields, semconv.FieldFromIdent(semconv.ColumnIdentValue, false))
105104

106105
// Add error fields if needed
107106
if hasErrors {
108107
fields = append(fields,
109-
arrow.Field{
110-
Name: types.ColumnNameError,
111-
Type: arrow.BinaryTypes.String,
112-
Metadata: types.ColumnMetadata(
113-
types.ColumnTypeParsed,
114-
types.Loki.String,
115-
),
116-
Nullable: true,
117-
},
118-
arrow.Field{
119-
Name: types.ColumnNameErrorDetails,
120-
Type: arrow.BinaryTypes.String,
121-
Metadata: types.ColumnMetadata(
122-
types.ColumnTypeParsed,
123-
types.Loki.String,
124-
),
125-
Nullable: true,
126-
},
108+
semconv.FieldFromIdent(semconv.ColumnIdentError, true),
109+
semconv.FieldFromIdent(semconv.ColumnIdentErrorDetails, true),
127110
)
128111
}
129112

130113
return fields
131114
}
132115

133116
func buildResult(
134-
unwrappedCol, errorCol, errorDetailsCol arrow.Array,
117+
castCol, errorCol, errorDetailsCol arrow.Array,
135118
fields []arrow.Field,
136119
) (*array.Struct, error) {
137120
hasErrors := errorCol != nil
@@ -144,23 +127,14 @@ func buildResult(
144127
columns := make([]arrow.Array, totalCols)
145128

146129
// Add new columns - these are newly created so don't need extra retain
147-
columns[0] = unwrappedCol
130+
columns[0] = castCol
148131
if hasErrors {
149132
columns[1] = errorCol
150133
columns[2] = errorDetailsCol
151134
}
152135

153136
// NewStructArrayWithFields will retain all columns
154-
result, err := array.NewStructArrayWithFields(columns, fields)
155-
156-
// Release our references to newly created columns (result now owns them)
157-
unwrappedCol.Release()
158-
if hasErrors {
159-
errorCol.Release()
160-
errorDetailsCol.Release()
161-
}
162-
163-
return result, err
137+
return array.NewStructArrayWithFields(columns, fields)
164138
}
165139

166140
func convertFloat(v string) (float64, error) {

pkg/engine/internal/executor/expressions.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
134134
if err != nil {
135135
return nil, fmt.Errorf("failed to lookup unary function: %w", err)
136136
}
137-
return fn.Evaluate(lhr)
137+
return fn.Evaluate(lhr, e.allocator)
138138

139139
case *physical.BinaryExpr:
140140
lhs, err := e.eval(expr.Left, input)
@@ -339,6 +339,7 @@ func NewArrayStruct(arr *array.Struct, ct types.ColumnType) *ArrayStruct {
339339
// ToArray implements ColumnVector.
340340
// Returns the underlying struct array.
341341
func (a *ArrayStruct) ToArray() arrow.Array {
342+
a.array.Retain()
342343
return a.array
343344
}
344345

@@ -402,6 +403,11 @@ func (a *ArrayStruct) Len() int64 {
402403
return a.rows
403404
}
404405

406+
// Release decreases the reference count by 1 on underlying Arrow array
407+
func (a *ArrayStruct) Release() {
408+
a.array.Release()
409+
}
410+
405411
// CoalesceVector represents multiple columns with the same name but different [types.ColumnType]
406412
// Vectors are ordered by precedence (highest precedence first).
407413
type CoalesceVector struct {

pkg/engine/internal/executor/expressions_test.go

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,12 @@ label_2
397397
})
398398
}
399399

400-
func TestEvaluateUnwrapExpression(t *testing.T) {
400+
func TestEvaluateUnaryCastExpression(t *testing.T) {
401+
colMsg := semconv.ColumnIdentMessage
402+
colStatusCode := semconv.NewIdentifier("status_code", types.ColumnTypeMetadata, types.Loki.String)
403+
colTimeout := semconv.NewIdentifier("timeout", types.ColumnTypeParsed, types.Loki.String)
404+
colMixedValues := semconv.NewIdentifier("mixed_values", types.ColumnTypeMetadata, types.Loki.String)
405+
401406
t.Run("unknown column", func(t *testing.T) {
402407
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
403408
defer alloc.AssertSize(t, 0) // Assert empty on test exit
@@ -409,13 +414,14 @@ func TestEvaluateUnwrapExpression(t *testing.T) {
409414
Type: types.ColumnTypeAmbiguous,
410415
},
411416
},
412-
Op: types.UnaryOpUnwrap,
417+
Op: types.UnaryOpCastFloat,
413418
}
414419

415420
n := len(words)
416421
rec := batch(n, time.Now())
417422
colVec, err := e.eval(expr, rec)
418423
require.NoError(t, err)
424+
defer colVec.Release()
419425

420426
id := colVec.Type().ArrowType().ID()
421427
require.Equal(t, arrow.STRUCT, id)
@@ -442,37 +448,41 @@ func TestEvaluateUnwrapExpression(t *testing.T) {
442448
}
443449
})
444450

445-
t.Run("unwrap column generates a value", func(t *testing.T) {
451+
t.Run("cast column generates a value", func(t *testing.T) {
446452
expr := &physical.UnaryExpr{
447453
Left: &physical.ColumnExpr{
448454
Ref: types.ColumnRef{
449455
Column: "status_code",
450456
Type: types.ColumnTypeAmbiguous,
451457
},
452458
},
453-
Op: types.UnaryOpUnwrap,
459+
Op: types.UnaryOpCastBytes,
454460
}
455461

456462
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
457463
defer alloc.AssertSize(t, 0) // Assert empty on test exit
458464
e := newExpressionEvaluator(alloc)
459465

460-
schema := arrow.NewSchema([]arrow.Field{
461-
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
462-
{Name: "status_code", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeMetadata, types.Loki.String)},
463-
{Name: "timeout", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeParsed, types.Loki.String)},
464-
}, nil)
466+
schema := arrow.NewSchema(
467+
[]arrow.Field{
468+
semconv.FieldFromIdent(colMsg, false),
469+
semconv.FieldFromIdent(colStatusCode, true),
470+
semconv.FieldFromIdent(colTimeout, true),
471+
},
472+
nil,
473+
)
465474
rows := arrowtest.Rows{
466-
{"message": "timeout set", "status_code": "200", "timeout": "2m"},
467-
{"message": "short timeout", "status_code": "204", "timeout": "10s"},
468-
{"message": "long timeout", "status_code": "404", "timeout": "1h"},
475+
{"utf8.builtin.message": "timeout set", "utf8.metadata.status_code": "200", "utf8.parsed.timeout": "2m"},
476+
{"utf8.builtin.message": "short timeout", "utf8.metadata.status_code": "204", "utf8.parsed.timeout": "10s"},
477+
{"utf8.builtin.message": "long timeout", "utf8.metadata.status_code": "404", "utf8.parsed.timeout": "1h"},
469478
}
470479

471480
record := rows.Record(alloc, schema)
472481
defer record.Release()
473482

474483
colVec, err := e.eval(expr, record)
475484
require.NoError(t, err)
485+
defer colVec.Release()
476486
id := colVec.Type().ArrowType().ID()
477487
require.Equal(t, arrow.STRUCT, id)
478488

@@ -490,37 +500,41 @@ func TestEvaluateUnwrapExpression(t *testing.T) {
490500
require.Equal(t, 404.0, value.Value(2))
491501
})
492502

493-
t.Run("unwrap column generates a value from a parsed column", func(t *testing.T) {
503+
t.Run("cast column generates a value from a parsed column", func(t *testing.T) {
494504
expr := &physical.UnaryExpr{
495505
Left: &physical.ColumnExpr{
496506
Ref: types.ColumnRef{
497507
Column: "timeout",
498508
Type: types.ColumnTypeAmbiguous,
499509
},
500510
},
501-
Op: types.UnaryOpUnwrapDuration,
511+
Op: types.UnaryOpCastDuration,
502512
}
503513

504514
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
505515
defer alloc.AssertSize(t, 0) // Assert empty on test exit
506516
e := newExpressionEvaluator(alloc)
507517

508-
schema := arrow.NewSchema([]arrow.Field{
509-
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
510-
{Name: "status_code", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeMetadata, types.Loki.String)},
511-
{Name: "timeout", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeParsed, types.Loki.String)},
512-
}, nil)
518+
schema := arrow.NewSchema(
519+
[]arrow.Field{
520+
semconv.FieldFromIdent(colMsg, false),
521+
semconv.FieldFromIdent(colStatusCode, true),
522+
semconv.FieldFromIdent(colTimeout, true),
523+
},
524+
nil,
525+
)
513526
rows := arrowtest.Rows{
514-
{"message": "timeout set", "status_code": "200", "timeout": "2m"},
515-
{"message": "short timeout", "status_code": "204", "timeout": "10s"},
516-
{"message": "long timeout", "status_code": "404", "timeout": "1h"},
527+
{"utf8.builtin.message": "timeout set", "utf8.metadata.status_code": "200", "utf8.parsed.timeout": "2m"},
528+
{"utf8.builtin.message": "short timeout", "utf8.metadata.status_code": "204", "utf8.parsed.timeout": "10s"},
529+
{"utf8.builtin.message": "long timeout", "utf8.metadata.status_code": "404", "utf8.parsed.timeout": "1h"},
517530
}
518531

519532
record := rows.Record(alloc, schema)
520533
defer record.Release()
521534

522535
colVec, err := e.eval(expr, record)
523536
require.NoError(t, err)
537+
defer colVec.Release()
524538
id := colVec.Type().ArrowType().ID()
525539
require.Equal(t, arrow.STRUCT, id)
526540

@@ -537,38 +551,42 @@ func TestEvaluateUnwrapExpression(t *testing.T) {
537551
require.Equal(t, 3600.0, value.Value(2))
538552
})
539553

540-
t.Run("unwrap tracks errors", func(t *testing.T) {
554+
t.Run("cast operation tracks errors", func(t *testing.T) {
541555
colExpr := &physical.UnaryExpr{
542556
Left: &physical.ColumnExpr{
543557
Ref: types.ColumnRef{
544558
Column: "mixed_values",
545559
Type: types.ColumnTypeAmbiguous,
546560
},
547561
},
548-
Op: types.UnaryOpUnwrap,
562+
Op: types.UnaryOpCastFloat,
549563
}
550564

551565
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
552566
defer alloc.AssertSize(t, 0) // Assert empty on test exit
553567
e := newExpressionEvaluator(alloc)
554568

555-
schema := arrow.NewSchema([]arrow.Field{
556-
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
557-
{Name: "mixed_values", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeMetadata, types.Loki.String)},
558-
}, nil)
569+
schema := arrow.NewSchema(
570+
[]arrow.Field{
571+
semconv.FieldFromIdent(colMsg, false),
572+
semconv.FieldFromIdent(colMixedValues, true),
573+
},
574+
nil,
575+
)
559576
rows := arrowtest.Rows{
560-
{"message": "valid numeric", "mixed_values": "42.5"},
561-
{"message": "invalid numeric", "mixed_values": "not_a_number"},
562-
{"message": "valid bytes", "mixed_values": "1KB"},
563-
{"message": "invalid bytes", "mixed_values": "invalid_bytes"},
564-
{"message": "empty string", "mixed_values": ""},
577+
{"utf8.builtin.message": "valid numeric", "utf8.metadata.mixed_values": "42.5"},
578+
{"utf8.builtin.message": "invalid numeric", "utf8.metadata.mixed_values": "not_a_number"},
579+
{"utf8.builtin.message": "valid bytes", "utf8.metadata.mixed_values": "1KB"},
580+
{"utf8.builtin.message": "invalid bytes", "utf8.metadata.mixed_values": "invalid_bytes"},
581+
{"utf8.builtin.message": "empty string", "utf8.metadata.mixed_values": ""},
565582
}
566583

567584
record := rows.Record(alloc, schema)
568585
defer record.Release()
569586

570587
colVec, err := e.eval(colExpr, record)
571588
require.NoError(t, err)
589+
defer colVec.Release()
572590
id := colVec.Type().ArrowType().ID()
573591
require.Equal(t, arrow.STRUCT, id)
574592

0 commit comments

Comments
 (0)