Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ make test-integration # run integration tests
go test ./... # run all tests with Go directly
go test -v ./pkg/logql/... # run tests in specific package
go test -run TestName ./pkg/path # run a specific test
BUILD_IN_CONTAINER=true NONINTERACTIVE=true make lint # run all linters (use in CI-like environment)
make lint # run all linters (use in CI-like environment)
make format # format code (gofmt and goimports)
```

**Note:** Use `BUILD_IN_CONTAINER=true NONINTERACTIVE=true make lint` to run linting in a container environment that matches CI exactly. The `NONINTERACTIVE=true` flag prevents Docker from trying to allocate a TTY, which is necessary in non-interactive environments like claude-code. The Makefile automatically detects and handles git worktrees by using `git rev-parse --git-dir` and `git rev-parse --git-common-dir` to mount the appropriate git directories, ensuring git commands work correctly in the container regardless of worktree structure.

### Building the Frontend

The Loki UI/frontend (different from the query-frontend) is located in pkg/ui/frontend and is built with [Vite](https://vitejs.dev/).
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (b *streamsResultBuilder) collectRow(rec arrow.Record, i int) (labels.Label
// TODO: keep errors if --strict is set
// These are reserved column names used to track parsing errors. We are dropping them until
// we add support for --strict parsing.
if shortName == types.ColumnNameParsedError || shortName == types.ColumnNameParsedErrorDetails {
if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails {
continue
}

Expand Down
206 changes: 206 additions & 0 deletions pkg/engine/internal/executor/cast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package executor

import (
"fmt"
"strconv"
"time"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/dustin/go-humanize"

"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)

func castFn(operation types.UnaryOp) UnaryFunction {
return UnaryFunc(func(input ColumnVector, allocator memory.Allocator) (ColumnVector, error) {
arr := input.ToArray()
defer arr.Release()

sourceCol, ok := arr.(*array.String)
if !ok {
return nil, fmt.Errorf("expected column to be of type string, got %T", arr)
}

// Get conversion function and process values
conversionFn := getConversionFunction(operation)
castCol, errTracker := castValues(sourceCol, conversionFn, allocator)
defer castCol.Release()

// Build error columns if needed
errorCol, errorDetailsCol := errTracker.buildArrays()
defer errTracker.releaseBuilders()
if errTracker.hasErrors {
defer errorCol.Release()
defer errorDetailsCol.Release()
}
Comment on lines +35 to +38
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this since that's already handled by the defer to releaseBuilders on the line above


// Build output schema and record
fields := buildValueAndErrorFields(errTracker.hasErrors)
result, err := buildValueAndErrorStruct(castCol, errorCol, errorDetailsCol, fields)
if err != nil {
return nil, err
}
return &ArrayStruct{
array: result,
ct: types.ColumnTypeGenerated,
rows: input.Len(),
}, nil
})
}

type conversionFn func(value string) (float64, error)

func getConversionFunction(operation types.UnaryOp) conversionFn {
switch operation {
case types.UnaryOpCastBytes:
return convertBytes
case types.UnaryOpCastDuration:
return convertDuration
default:
return convertFloat
}
}

func castValues(
sourceCol *array.String,
conversionFn conversionFn,
allocator memory.Allocator,
) (arrow.Array, *errorTracker) {
castBuilder := array.NewFloat64Builder(allocator)
defer castBuilder.Release()

tracker := newErrorTracker(allocator)

for i := 0; i < sourceCol.Len(); i++ {
if sourceCol.IsNull(i) {
castBuilder.AppendNull()
tracker.recordSuccess()
} else {
valueStr := sourceCol.Value(i)
if val, err := conversionFn(valueStr); err == nil {
castBuilder.Append(val)
tracker.recordSuccess()
} else {
// Use 0.0 as default for errors, for backwards compatibility with old engine
castBuilder.Append(0.0)
tracker.recordError(i, err)
}
}
}

return castBuilder.NewArray(), tracker
}

func buildValueAndErrorFields(
hasErrors bool,
) []arrow.Field {
fields := make([]arrow.Field, 0, 3)

// Add value field. Not nullable in practice since we use 0.0 when conversion fails, but as of
// writing all coumns are marked as nullable, even Timestamp and Message, so staying consistent
fields = append(fields, semconv.FieldFromIdent(semconv.ColumnIdentValue, true))

// Add error fields if needed
if hasErrors {
fields = append(fields,
semconv.FieldFromIdent(semconv.ColumnIdentError, true),
semconv.FieldFromIdent(semconv.ColumnIdentErrorDetails, true),
)
}

return fields
}

func buildValueAndErrorStruct(
valVol, errorCol, errorDetailsCol arrow.Array,
fields []arrow.Field,
) (*array.Struct, error) {
hasErrors := errorCol != nil

totalCols := 1
if hasErrors {
totalCols += 2
}

columns := make([]arrow.Array, totalCols)

// Add new columns - these are newly created so don't need extra retain
columns[0] = valVol
if hasErrors {
columns[1] = errorCol
columns[2] = errorDetailsCol
}

// NewStructArrayWithFields will retain all columns
return array.NewStructArrayWithFields(columns, fields)
}

func convertFloat(v string) (float64, error) {
return strconv.ParseFloat(v, 64)
}

func convertDuration(v string) (float64, error) {
d, err := time.ParseDuration(v)
if err != nil {
return 0, err
}
return d.Seconds(), nil
}

func convertBytes(v string) (float64, error) {
b, err := humanize.ParseBytes(v)
if err != nil {
return 0, err
}
return float64(b), nil
}

type errorTracker struct {
hasErrors bool
errorBuilder *array.StringBuilder
detailsBuilder *array.StringBuilder
allocator memory.Allocator
}

func newErrorTracker(allocator memory.Allocator) *errorTracker {
return &errorTracker{allocator: allocator}
}

func (et *errorTracker) recordError(rowIndex int, err error) {
if !et.hasErrors {
et.errorBuilder = array.NewStringBuilder(et.allocator)
et.detailsBuilder = array.NewStringBuilder(et.allocator)
// Backfill nulls for previous rows
for range rowIndex {
et.errorBuilder.AppendNull()
et.detailsBuilder.AppendNull()
}
et.hasErrors = true
}
et.errorBuilder.Append(types.SampleExtractionErrorType)
et.detailsBuilder.Append(err.Error())
}

func (et *errorTracker) recordSuccess() {
if et.hasErrors {
et.errorBuilder.AppendNull()
et.detailsBuilder.AppendNull()
}
}

func (et *errorTracker) buildArrays() (arrow.Array, arrow.Array) {
if !et.hasErrors {
return nil, nil
}
return et.errorBuilder.NewArray(), et.detailsBuilder.NewArray()
}

func (et *errorTracker) releaseBuilders() {
if et.hasErrors {
et.errorBuilder.Release()
et.detailsBuilder.Release()
}
}
2 changes: 1 addition & 1 deletion pkg/engine/internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (c *Context) executeProjection(ctx context.Context, proj *physical.Projecti
}

if len(proj.Expressions) == 0 {
return errorPipeline(ctx, fmt.Errorf("projection expects at least one column, got 0"))
return errorPipeline(ctx, fmt.Errorf("projection expects at least one expression, got 0"))
}

p, err := NewProjectPipeline(inputs[0], proj, &c.evaluator)
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/internal/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestExecutor_Projection(t *testing.T) {
c := &Context{}
pipeline := c.executeProjection(ctx, &physical.Projection{Expressions: cols}, []Pipeline{emptyPipeline()})
_, err := pipeline.Read(ctx)
require.ErrorContains(t, err, "projection expects at least one column, got 0")
require.ErrorContains(t, err, "projection expects at least one expression, got 0")
})

t.Run("multiple inputs result in error", func(t *testing.T) {
Expand Down
107 changes: 105 additions & 2 deletions pkg/engine/internal/executor/expressions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,19 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)

type expressionEvaluator struct{}
type expressionEvaluator struct {
allocator memory.Allocator
}

func newExpressionEvaluator(allocator memory.Allocator) expressionEvaluator {
if allocator == nil {
allocator = memory.DefaultAllocator
}

return expressionEvaluator{
allocator: allocator,
}
}

func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record) (ColumnVector, error) {
switch expr := expr.(type) {
Expand Down Expand Up @@ -122,7 +134,7 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
if err != nil {
return nil, fmt.Errorf("failed to lookup unary function: %w", err)
}
return fn.Evaluate(lhr)
return fn.Evaluate(lhr, e.allocator)

case *physical.BinaryExpr:
lhs, err := e.eval(expr.Left, input)
Expand Down Expand Up @@ -305,6 +317,97 @@ func (a *Array) Release() {
a.array.Release()
}

// ArrayStruct represents multiple columns of data, stored as an [array.Struct].
// It implements the ColumnVector interface while preserving access to individual fields.
type ArrayStruct struct {
array *array.Struct
ct types.ColumnType
rows int64
}

var _ ColumnVector = (*ArrayStruct)(nil)

// NewArrayStruct creates a new ArrayStruct from an array.Struct
func NewArrayStruct(arr *array.Struct, ct types.ColumnType) *ArrayStruct {
return &ArrayStruct{
array: arr,
ct: ct,
rows: int64(arr.Len()),
}
}

// ToArray implements ColumnVector.
// Returns the underlying struct array.
func (a *ArrayStruct) ToArray() arrow.Array {
a.array.Retain()
return a.array
}

// Value implements ColumnVector.
// Returns a map of field names to values at the specified index.
func (a *ArrayStruct) Value(i int) any {
if a.array.IsNull(i) || !a.array.IsValid(i) {
return nil
}

// Return a map representing the struct's fields at this index
result := make(map[string]any)
structType := a.array.DataType().(*arrow.StructType)

for fieldIdx := 0; fieldIdx < a.array.NumField(); fieldIdx++ {
field := a.array.Field(fieldIdx)
if field.IsNull(i) {
continue
}
fieldMeta := structType.Field(fieldIdx)

// Extract value from the field array at index i
var value any
switch arr := field.(type) {
case *array.Boolean:
value = arr.Value(i)
case *array.String:
value = arr.Value(i)
case *array.Int64:
value = arr.Value(i)
case *array.Uint64:
value = arr.Value(i)
case *array.Float64:
value = arr.Value(i)
default:
value = nil
}
result[fieldMeta.Name] = value
}

return result
}

// Type implements ColumnVector.
func (a *ArrayStruct) Type() types.DataType {
dt := a.array.DataType()
st, ok := dt.(*arrow.StructType)
if !ok {
return nil
}
return types.NewStructType(st)
}

// ColumnType implements ColumnVector.
func (a *ArrayStruct) ColumnType() types.ColumnType {
return a.ct
}

// Len implements ColumnVector.
func (a *ArrayStruct) Len() int64 {
return a.rows
}

// Release decreases the reference count by 1 on underlying Arrow array
func (a *ArrayStruct) Release() {
a.array.Release()
}

// CoalesceVector represents multiple columns with the same name but different [types.ColumnType]
// Vectors are ordered by precedence (highest precedence first).
type CoalesceVector struct {
Expand Down
Loading