Skip to content

Commit 9007cf8

Browse files
committed
feat: implement unwrap as a projection
1 parent 2e418b1 commit 9007cf8

25 files changed

+1215
-79
lines changed

pkg/engine/compat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (b *streamsResultBuilder) collectRow(rec arrow.Record, i int) (labels.Label
124124
// TODO: keep errors if --strict is set
125125
// These are reserved column names used to track parsing errors. We are dropping them until
126126
// we add support for --strict parsing.
127-
if colName == types.ColumnNameParsedError || colName == types.ColumnNameParsedErrorDetails {
127+
if colName == types.ColumnNameError || colName == types.ColumnNameErrorDetails {
128128
continue
129129
}
130130

pkg/engine/internal/datatype/arrow.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ var (
1212
Timestamp DataType
1313
Duration DataType
1414
Bytes DataType
15+
Struct DataType
1516
}{
1617
Null: tNull{},
1718
Bool: tBool{},
@@ -21,6 +22,7 @@ var (
2122
Timestamp: tTimestamp{},
2223
Duration: tDuration{},
2324
Bytes: tBytes{},
25+
Struct: tStruct{},
2426
}
2527

2628
Arrow = struct {

pkg/engine/internal/datatype/types.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
STRING = Type(arrow.STRING)
1919
INT64 = Type(arrow.INT64)
2020
FLOAT64 = Type(arrow.FLOAT64)
21+
STRUCT = Type(arrow.STRUCT)
2122
)
2223

2324
func (t Type) String() string {
@@ -32,6 +33,8 @@ func (t Type) String() string {
3233
return "INT64"
3334
case FLOAT64:
3435
return "FLOAT64"
36+
case STRUCT:
37+
return "STRUCT"
3538
default:
3639
return "INVALID"
3740
}
@@ -91,6 +94,19 @@ func (tBytes) ID() Type { return INT64 }
9194
func (tBytes) String() string { return "bytes" }
9295
func (tBytes) ArrowType() arrow.DataType { return Arrow.Integer }
9396

97+
type tStruct struct {
98+
arrowType *arrow.StructType
99+
}
100+
101+
func (t tStruct) ID() Type { return STRUCT }
102+
func (t tStruct) String() string { return "struct" }
103+
func (t tStruct) ArrowType() arrow.DataType { return t.arrowType }
104+
105+
// NewStructType creates a DataType from an Arrow StructType
106+
func NewStructType(arrowType *arrow.StructType) DataType {
107+
return tStruct{arrowType: arrowType}
108+
}
109+
94110
var (
95111
names = map[string]DataType{
96112
Loki.Null.String(): Loki.Null,
@@ -101,6 +117,7 @@ var (
101117
Loki.Timestamp.String(): Loki.Timestamp,
102118
Loki.Duration.String(): Loki.Duration,
103119
Loki.Bytes.String(): Loki.Bytes,
120+
Loki.Struct.String(): Loki.Struct,
104121
}
105122
)
106123

pkg/engine/internal/executor/expressions.go

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,19 @@ import (
1313
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
1414
)
1515

16-
type expressionEvaluator struct{}
16+
type expressionEvaluator struct {
17+
allocator memory.Allocator
18+
}
19+
20+
func newExpressionEvaluator(allocator memory.Allocator) expressionEvaluator {
21+
if allocator == nil {
22+
allocator = memory.DefaultAllocator
23+
}
24+
25+
return expressionEvaluator{
26+
allocator: allocator,
27+
}
28+
}
1729

1830
func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record) (ColumnVector, error) {
1931
switch expr := expr.(type) {
@@ -135,6 +147,17 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
135147
return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): %w", expr.Op, lhs.Type().ArrowType(), rhs.Type().ArrowType(), err)
136148
}
137149
return fn.Evaluate(lhs, rhs)
150+
case *physical.UnwrapExpr:
151+
arr, err := unwrap(expr.Op, expr.Ref.Column, input, e.allocator)
152+
if err != nil {
153+
return nil, err
154+
}
155+
156+
return &ArrayStruct{
157+
array: arr,
158+
ct: types.ColumnTypeGenerated,
159+
rows: input.NumRows(),
160+
}, nil
138161
}
139162

140163
return nil, fmt.Errorf("unknown expression: %v", expr)
@@ -279,6 +302,91 @@ func (a *Array) Len() int64 {
279302
return int64(a.array.Len())
280303
}
281304

305+
// ArrayStruct represents multiple columns of data, stored as an [array.Struct].
306+
// It implements the ColumnVector interface while preserving access to individual fields.
307+
type ArrayStruct struct {
308+
array *array.Struct
309+
ct types.ColumnType
310+
rows int64
311+
}
312+
313+
var _ ColumnVector = (*ArrayStruct)(nil)
314+
315+
// NewArrayStruct creates a new ArrayStruct from an array.Struct
316+
func NewArrayStruct(arr *array.Struct, ct types.ColumnType) *ArrayStruct {
317+
return &ArrayStruct{
318+
array: arr,
319+
ct: ct,
320+
rows: int64(arr.Len()),
321+
}
322+
}
323+
324+
// ToArray implements ColumnVector.
325+
// Returns the underlying struct array.
326+
func (a *ArrayStruct) ToArray() arrow.Array {
327+
return a.array
328+
}
329+
330+
// Value implements ColumnVector.
331+
// Returns a map of field names to values at the specified index.
332+
func (a *ArrayStruct) Value(i int) any {
333+
if a.array.IsNull(i) || !a.array.IsValid(i) {
334+
return nil
335+
}
336+
337+
// Return a map representing the struct's fields at this index
338+
result := make(map[string]any)
339+
structType := a.array.DataType().(*arrow.StructType)
340+
341+
for fieldIdx := 0; fieldIdx < a.array.NumField(); fieldIdx++ {
342+
field := a.array.Field(fieldIdx)
343+
if field.IsNull(i) {
344+
continue
345+
}
346+
fieldMeta := structType.Field(fieldIdx)
347+
348+
// Extract value from the field array at index i
349+
var value any
350+
switch arr := field.(type) {
351+
case *array.Boolean:
352+
value = arr.Value(i)
353+
case *array.String:
354+
value = arr.Value(i)
355+
case *array.Int64:
356+
value = arr.Value(i)
357+
case *array.Uint64:
358+
value = arr.Value(i)
359+
case *array.Float64:
360+
value = arr.Value(i)
361+
default:
362+
value = nil
363+
}
364+
result[fieldMeta.Name] = value
365+
}
366+
367+
return result
368+
}
369+
370+
// Type implements ColumnVector.
371+
func (a *ArrayStruct) Type() datatype.DataType {
372+
dt := a.array.DataType()
373+
st, ok := dt.(*arrow.StructType)
374+
if !ok {
375+
return nil
376+
}
377+
return datatype.NewStructType(st)
378+
}
379+
380+
// ColumnType implements ColumnVector.
381+
func (a *ArrayStruct) ColumnType() types.ColumnType {
382+
return a.ct
383+
}
384+
385+
// Len implements ColumnVector.
386+
func (a *ArrayStruct) Len() int64 {
387+
return a.rows
388+
}
389+
282390
// CoalesceVector represents multiple columns with the same name but different [types.ColumnType]
283391
// Vectors are ordered by precedence (highest precedence first).
284392
type CoalesceVector struct {

0 commit comments

Comments
 (0)