Skip to content

Commit 7eda674

Browse files
authored
feat(engine): basic math expressions (#19407)
1 parent bfd7060 commit 7eda674

File tree

18 files changed

+1158
-282
lines changed

18 files changed

+1158
-282
lines changed

pkg/engine/internal/executor/expressions_test.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -240,10 +240,7 @@ func collectBooleanArray(arr *array.Boolean) []bool {
240240
var words = []string{"one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"}
241241

242242
func batch(n int, now time.Time) arrow.Record {
243-
// 1. Create a memory allocator
244-
mem := memory.NewGoAllocator()
245-
246-
// 2. Define the schema
243+
// Define the schema
247244
schema := arrow.NewSchema(
248245
[]arrow.Field{
249246
semconv.FieldFromIdent(semconv.ColumnIdentMessage, false),
@@ -252,12 +249,11 @@ func batch(n int, now time.Time) arrow.Record {
252249
nil, // No metadata
253250
)
254251

255-
// 3. Create builders for each column
256-
logBuilder := array.NewStringBuilder(mem)
257-
258-
tsBuilder := array.NewTimestampBuilder(mem, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
252+
// Create builders for each column
253+
logBuilder := array.NewStringBuilder(memory.DefaultAllocator)
254+
tsBuilder := array.NewTimestampBuilder(memory.DefaultAllocator, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
259255

260-
// 4. Append data to the builders
256+
// Append data to the builders
261257
logs := make([]string, n)
262258
ts := make([]arrow.Timestamp, n)
263259

@@ -269,12 +265,12 @@ func batch(n int, now time.Time) arrow.Record {
269265
tsBuilder.AppendValues(ts, nil)
270266
logBuilder.AppendValues(logs, nil)
271267

272-
// 5. Build the arrays
268+
// Build the arrays
273269
logArray := logBuilder.NewArray()
274270

275271
tsArray := tsBuilder.NewArray()
276272

277-
// 6. Create the record
273+
// Create the record
278274
columns := []arrow.Array{logArray, tsArray}
279275
record := array.NewRecord(schema, columns, int64(n))
280276

pkg/engine/internal/executor/functions.go

Lines changed: 97 additions & 38 deletions
Large diffs are not rendered by default.

pkg/engine/internal/executor/functions_test.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,34 @@ func TestBinaryFunctionRegistry_GetForSignature(t *testing.T) {
151151
dataType: arrow.BinaryTypes.String,
152152
expectError: false,
153153
},
154+
{
155+
name: "valid div operation",
156+
op: types.BinaryOpDiv,
157+
dataType: arrow.PrimitiveTypes.Float64,
158+
expectError: false,
159+
},
160+
{
161+
name: "valid add operation",
162+
op: types.BinaryOpAdd,
163+
dataType: arrow.PrimitiveTypes.Float64,
164+
expectError: false,
165+
},
166+
{
167+
name: "valid Mul operation",
168+
op: types.BinaryOpMul,
169+
dataType: arrow.PrimitiveTypes.Float64,
170+
expectError: false,
171+
},
172+
{
173+
name: "valid sub operation",
174+
op: types.BinaryOpSub,
175+
dataType: arrow.PrimitiveTypes.Float64,
176+
expectError: false,
177+
},
154178
{
155179
name: "invalid operation",
156-
op: types.BinaryOpAdd, // Not registered
157-
dataType: arrow.PrimitiveTypes.Int64,
180+
op: types.BinaryOpAnd, // Not registered
181+
dataType: arrow.FixedWidthTypes.Boolean,
158182
expectError: true,
159183
},
160184
{

pkg/engine/internal/executor/project.go

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,25 @@ func NewProjectPipeline(input Pipeline, proj *physical.Projection, evaluator *ex
2121

2222
// Get the column names from the projection expressions
2323
colRefs := make([]types.ColumnRef, 0, len(proj.Expressions))
24-
unaryExprs := make([]physical.UnaryExpression, 0, len(proj.Expressions))
24+
mathExprs := make([]physical.Expression, 0, len(proj.Expressions))
2525

2626
for i, expr := range proj.Expressions {
2727
switch expr := expr.(type) {
2828
case *physical.ColumnExpr:
2929
colRefs = append(colRefs, expr.Ref)
3030
case *physical.UnaryExpr:
31-
unaryExprs = append(unaryExprs, expr)
31+
mathExprs = append(mathExprs, expr)
32+
case *physical.BinaryExpr:
33+
mathExprs = append(mathExprs, expr)
3234
default:
3335
return nil, fmt.Errorf("projection expression %d is unsupported", i)
3436
}
3537
}
3638

39+
if len(mathExprs) > 1 {
40+
return nil, fmt.Errorf("there might be only one math expression for `value` column at a time")
41+
}
42+
3743
// Create KEEP projection pipeline:
3844
// Drop all columns except the ones referenced in proj.Expressions.
3945
if !proj.All && !proj.Drop && !proj.Expand {
@@ -66,9 +72,9 @@ func NewProjectPipeline(input Pipeline, proj *physical.Projection, evaluator *ex
6672

6773
// Create EXPAND projection pipeline:
6874
// Keep all columns and expand the ones referenced in proj.Expressions.
69-
// TODO: as implemented, expanding and keeping/dropping cannot happen in the same projection. Is this desired?
70-
if proj.All && proj.Expand {
71-
return newExpandPipeline(unaryExprs, evaluator, input)
75+
// TODO: as implemented, epanding and keeping/dropping cannot happen in the same projection. Is this desired?
76+
if proj.All && proj.Expand && len(mathExprs) > 0 {
77+
return newExpandPipeline(mathExprs[0], evaluator, input)
7278
}
7379

7480
return nil, errNotImplemented
@@ -105,7 +111,7 @@ func newKeepPipeline(colRefs []types.ColumnRef, keepFunc func([]types.ColumnRef,
105111
}, input), nil
106112
}
107113

108-
func newExpandPipeline(expressions []physical.UnaryExpression, evaluator *expressionEvaluator, input Pipeline) (*GenericPipeline, error) {
114+
func newExpandPipeline(expr physical.Expression, evaluator *expressionEvaluator, input Pipeline) (*GenericPipeline, error) {
109115
return newGenericPipeline(func(ctx context.Context, inputs []Pipeline) (arrow.Record, error) {
110116
if len(inputs) != 1 {
111117
return nil, fmt.Errorf("expected 1 input, got %d", len(inputs))
@@ -116,34 +122,46 @@ func newExpandPipeline(expressions []physical.UnaryExpression, evaluator *expres
116122
return nil, err
117123
}
118124

119-
columns := []arrow.Array{}
120-
fields := []arrow.Field{}
125+
outputFields := make([]arrow.Field, 0)
126+
outputCols := make([]arrow.Array, 0)
127+
schema := batch.Schema()
121128

129+
// move all columns into the output except `value`
122130
for i, field := range batch.Schema().Fields() {
123-
columns = append(columns, batch.Column(i))
124-
fields = append(fields, field)
125-
}
126-
127-
for _, expr := range expressions {
128-
vec, err := evaluator.eval(expr, batch)
131+
ident, err := semconv.ParseFQN(schema.Field(i).Name)
129132
if err != nil {
130133
return nil, err
131134
}
132-
if arrStruct, ok := vec.(*array.Struct); ok {
133-
structSchema, ok := arrStruct.DataType().(*arrow.StructType)
134-
if !ok {
135-
return nil, fmt.Errorf("unexpected type returned from evaluation, expected *arrow.StructType, got %T", arrStruct.DataType())
136-
}
135+
if !ident.Equal(semconv.ColumnIdentValue) {
136+
outputCols = append(outputCols, batch.Column(i))
137+
outputFields = append(outputFields, field)
138+
}
139+
}
137140

138-
for i := range arrStruct.NumField() {
139-
columns = append(columns, arrStruct.Field(i))
140-
fields = append(fields, structSchema.Field(i))
141-
}
141+
vec, err := evaluator.eval(expr, batch)
142+
if err != nil {
143+
return nil, err
144+
}
145+
146+
switch arrCasted := vec.(type) {
147+
case *array.Struct:
148+
structSchema, ok := arrCasted.DataType().(*arrow.StructType)
149+
if !ok {
150+
return nil, fmt.Errorf("unexpected type returned from evaluation, expected *arrow.StructType, got %T", arrCasted.DataType())
151+
}
152+
for i := range arrCasted.NumField() {
153+
outputCols = append(outputCols, arrCasted.Field(i))
154+
outputFields = append(outputFields, structSchema.Field(i))
142155
}
156+
case *array.Float64:
157+
outputFields = append(outputFields, semconv.FieldFromIdent(semconv.ColumnIdentValue, false))
158+
outputCols = append(outputCols, arrCasted)
159+
default:
160+
return nil, fmt.Errorf("unexpected type returned from evaluation %T", arrCasted.DataType())
143161
}
144162

145-
metadata := batch.Schema().Metadata()
146-
schema := arrow.NewSchema(fields, &metadata)
147-
return array.NewRecord(schema, columns, batch.NumRows()), nil
163+
metadata := schema.Metadata()
164+
outputSchema := arrow.NewSchema(outputFields, &metadata)
165+
return array.NewRecord(outputSchema, outputCols, batch.NumRows()), nil
148166
}, input), nil
149167
}

0 commit comments

Comments
 (0)