Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
c3fe4d1
merge:
spiridonov Oct 6, 2025
605933c
math expressions
spiridonov Oct 7, 2025
3b5d6a4
merge
spiridonov Oct 7, 2025
7978ff5
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 7, 2025
e2e6e87
fix merge
spiridonov Oct 7, 2025
5a238b8
format
spiridonov Oct 7, 2025
d88a7e0
format
spiridonov Oct 7, 2025
a1bf376
memory leaks
spiridonov Oct 7, 2025
eedbfaf
lint
spiridonov Oct 7, 2025
36f3540
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 8, 2025
1ef3a95
merge
spiridonov Oct 8, 2025
49c5215
wip
spiridonov Oct 8, 2025
88892c7
revert go.mod
spiridonov Oct 8, 2025
a2147e1
revert go.mod
spiridonov Oct 8, 2025
a10228a
revert go.mod
spiridonov Oct 8, 2025
14265a2
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 9, 2025
ffce0da
merge
spiridonov Oct 9, 2025
357450a
release
spiridonov Oct 9, 2025
1ce7c78
comment
spiridonov Oct 9, 2025
75d4ab4
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 9, 2025
0ae79aa
Merge branch 'main' into spiridonov-rate-aggregate
spiridonov Oct 9, 2025
7c556fb
do.mod
spiridonov Oct 9, 2025
206ac82
Merge branch 'spiridonov-rate-aggregate' of github.com:grafana/loki i…
spiridonov Oct 9, 2025
ef1e804
do.mod
spiridonov Oct 9, 2025
a7181c2
semconv
spiridonov Oct 9, 2025
1f03fba
Merge branch 'main' into spiridonov-rate-aggregate
spiridonov Oct 9, 2025
dd74932
Merge branch 'main' into spiridonov-rate-aggregate
spiridonov Oct 9, 2025
fbad3d0
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 10, 2025
0c1453f
Merge branch 'spiridonov-rate-aggregate' of github.com:grafana/loki i…
spiridonov Oct 10, 2025
2fede5f
Merge branch 'main' into spiridonov-rate-aggregate
spiridonov Oct 10, 2025
de1851c
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 14, 2025
42219bb
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 14, 2025
efcb3e8
removed rate
spiridonov Oct 14, 2025
51e100e
Merge branch 'spiridonov-rate-aggregate' of github.com:grafana/loki i…
spiridonov Oct 14, 2025
f78d9f1
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 15, 2025
7cae847
pr comments
spiridonov Oct 15, 2025
a8b5c3c
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 15, 2025
87c1ee0
format
spiridonov Oct 15, 2025
3cce17d
Merge branch 'main' into spiridonov-rate-aggregate
spiridonov Oct 15, 2025
7216e04
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 16, 2025
f57498f
input columns
spiridonov Oct 16, 2025
32bcbc6
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 16, 2025
beebaba
Merge branch 'spiridonov-rate-aggregate' of github.com:grafana/loki i…
spiridonov Oct 16, 2025
bba1e87
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 17, 2025
23f20c8
separate join and math expressions
spiridonov Oct 17, 2025
dec254e
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 17, 2025
16552bd
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 20, 2025
d8ccef6
merge
spiridonov Oct 21, 2025
260a9c8
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 21, 2025
d87a2d8
projection
spiridonov Oct 21, 2025
64f4263
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 21, 2025
77578ea
lint
spiridonov Oct 21, 2025
b15eb75
error handling
spiridonov Oct 21, 2025
294dc34
lint
spiridonov Oct 21, 2025
814c318
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 22, 2025
61eaef0
clean
spiridonov Oct 22, 2025
6b463db
comments
spiridonov Oct 23, 2025
650c531
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 23, 2025
25077f5
comments
spiridonov Oct 23, 2025
1bdaeab
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 24, 2025
ce7acdb
merge fix
spiridonov Oct 24, 2025
e73d10b
unused code
spiridonov Oct 24, 2025
935472a
rename
spiridonov Oct 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions pkg/engine/internal/executor/expressions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,7 @@ func collectBooleanArray(arr *array.Boolean) []bool {
var words = []string{"one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"}

func batch(n int, now time.Time) arrow.Record {
// 1. Create a memory allocator
mem := memory.NewGoAllocator()

// 2. Define the schema
// Define the schema
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(semconv.ColumnIdentMessage, false),
Expand All @@ -252,12 +249,11 @@ func batch(n int, now time.Time) arrow.Record {
nil, // No metadata
)

// 3. Create builders for each column
logBuilder := array.NewStringBuilder(mem)

tsBuilder := array.NewTimestampBuilder(mem, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
// Create builders for each column
logBuilder := array.NewStringBuilder(memory.DefaultAllocator)
tsBuilder := array.NewTimestampBuilder(memory.DefaultAllocator, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})

// 4. Append data to the builders
// Append data to the builders
logs := make([]string, n)
ts := make([]arrow.Timestamp, n)

Expand All @@ -269,12 +265,12 @@ func batch(n int, now time.Time) arrow.Record {
tsBuilder.AppendValues(ts, nil)
logBuilder.AppendValues(logs, nil)

// 5. Build the arrays
// Build the arrays
logArray := logBuilder.NewArray()

tsArray := tsBuilder.NewArray()

// 6. Create the record
// Create the record
columns := []arrow.Array{logArray, tsArray}
record := array.NewRecord(schema, columns, int64(n))

Expand Down
135 changes: 97 additions & 38 deletions pkg/engine/internal/executor/functions.go

Large diffs are not rendered by default.

28 changes: 26 additions & 2 deletions pkg/engine/internal/executor/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,34 @@ func TestBinaryFunctionRegistry_GetForSignature(t *testing.T) {
dataType: arrow.BinaryTypes.String,
expectError: false,
},
{
name: "valid div operation",
op: types.BinaryOpDiv,
dataType: arrow.PrimitiveTypes.Float64,
expectError: false,
},
{
name: "valid add operation",
op: types.BinaryOpAdd,
dataType: arrow.PrimitiveTypes.Float64,
expectError: false,
},
{
name: "valid Mul operation",
op: types.BinaryOpMul,
dataType: arrow.PrimitiveTypes.Float64,
expectError: false,
},
{
name: "valid sub operation",
op: types.BinaryOpSub,
dataType: arrow.PrimitiveTypes.Float64,
expectError: false,
},
{
name: "invalid operation",
op: types.BinaryOpAdd, // Not registered
dataType: arrow.PrimitiveTypes.Int64,
op: types.BinaryOpAnd, // Not registered
dataType: arrow.FixedWidthTypes.Boolean,
expectError: true,
},
{
Expand Down
70 changes: 44 additions & 26 deletions pkg/engine/internal/executor/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,25 @@ func NewProjectPipeline(input Pipeline, proj *physical.Projection, evaluator *ex

// Get the column names from the projection expressions
colRefs := make([]types.ColumnRef, 0, len(proj.Expressions))
unaryExprs := make([]physical.UnaryExpression, 0, len(proj.Expressions))
mathExprs := make([]physical.Expression, 0, len(proj.Expressions))

for i, expr := range proj.Expressions {
switch expr := expr.(type) {
case *physical.ColumnExpr:
colRefs = append(colRefs, expr.Ref)
case *physical.UnaryExpr:
unaryExprs = append(unaryExprs, expr)
mathExprs = append(mathExprs, expr)
case *physical.BinaryExpr:
mathExprs = append(mathExprs, expr)
default:
return nil, fmt.Errorf("projection expression %d is unsupported", i)
}
}

if len(mathExprs) > 1 {
return nil, fmt.Errorf("there might be only one math expression for `value` column at a time")
}

// Create KEEP projection pipeline:
// Drop all columns except the ones referenced in proj.Expressions.
if !proj.All && !proj.Drop && !proj.Expand {
Expand Down Expand Up @@ -66,9 +72,9 @@ func NewProjectPipeline(input Pipeline, proj *physical.Projection, evaluator *ex

// Create EXPAND projection pipeline:
// Keep all columns and expand the ones referenced in proj.Expressions.
// TODO: as implemented, expanding and keeping/dropping cannot happen in the same projection. Is this desired?
if proj.All && proj.Expand {
return newExpandPipeline(unaryExprs, evaluator, input)
// TODO: as implemented, epanding and keeping/dropping cannot happen in the same projection. Is this desired?
if proj.All && proj.Expand && len(mathExprs) > 0 {
return newExpandPipeline(mathExprs[0], evaluator, input)
}

return nil, errNotImplemented
Expand Down Expand Up @@ -105,7 +111,7 @@ func newKeepPipeline(colRefs []types.ColumnRef, keepFunc func([]types.ColumnRef,
}, input), nil
}

func newExpandPipeline(expressions []physical.UnaryExpression, evaluator *expressionEvaluator, input Pipeline) (*GenericPipeline, error) {
func newExpandPipeline(expr physical.Expression, evaluator *expressionEvaluator, input Pipeline) (*GenericPipeline, error) {
return newGenericPipeline(func(ctx context.Context, inputs []Pipeline) (arrow.Record, error) {
if len(inputs) != 1 {
return nil, fmt.Errorf("expected 1 input, got %d", len(inputs))
Expand All @@ -116,34 +122,46 @@ func newExpandPipeline(expressions []physical.UnaryExpression, evaluator *expres
return nil, err
}

columns := []arrow.Array{}
fields := []arrow.Field{}
outputFields := make([]arrow.Field, 0)
outputCols := make([]arrow.Array, 0)
schema := batch.Schema()

// move all columns into the output except `value`
for i, field := range batch.Schema().Fields() {
columns = append(columns, batch.Column(i))
fields = append(fields, field)
}

for _, expr := range expressions {
vec, err := evaluator.eval(expr, batch)
ident, err := semconv.ParseFQN(schema.Field(i).Name)
if err != nil {
return nil, err
}
if arrStruct, ok := vec.(*array.Struct); ok {
structSchema, ok := arrStruct.DataType().(*arrow.StructType)
if !ok {
return nil, fmt.Errorf("unexpected type returned from evaluation, expected *arrow.StructType, got %T", arrStruct.DataType())
}
if !ident.Equal(semconv.ColumnIdentValue) {
outputCols = append(outputCols, batch.Column(i))
outputFields = append(outputFields, field)
}
}

for i := range arrStruct.NumField() {
columns = append(columns, arrStruct.Field(i))
fields = append(fields, structSchema.Field(i))
}
vec, err := evaluator.eval(expr, batch)
if err != nil {
return nil, err
}

switch arrCasted := vec.(type) {
case *array.Struct:
structSchema, ok := arrCasted.DataType().(*arrow.StructType)
if !ok {
return nil, fmt.Errorf("unexpected type returned from evaluation, expected *arrow.StructType, got %T", arrCasted.DataType())
}
for i := range arrCasted.NumField() {
outputCols = append(outputCols, arrCasted.Field(i))
outputFields = append(outputFields, structSchema.Field(i))
}
case *array.Float64:
outputFields = append(outputFields, semconv.FieldFromIdent(semconv.ColumnIdentValue, false))
outputCols = append(outputCols, arrCasted)
default:
return nil, fmt.Errorf("unexpected type returned from evaluation %T", arrCasted.DataType())
}

metadata := batch.Schema().Metadata()
schema := arrow.NewSchema(fields, &metadata)
return array.NewRecord(schema, columns, batch.NumRows()), nil
metadata := schema.Metadata()
outputSchema := arrow.NewSchema(outputFields, &metadata)
return array.NewRecord(outputSchema, outputCols, batch.NumRows()), nil
}, input), nil
}
Loading