Skip to content
4 changes: 2 additions & 2 deletions pkg/dataobj/consumer/logsobj/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (cfg *BuilderConfig) Validate() error {
if cfg.DataobjSortOrder == "" {
cfg.DataobjSortOrder = sortStreamASC // default to [streamID ASC, timestamp DESC] sorting
}
if !(cfg.DataobjSortOrder == sortStreamASC || cfg.DataobjSortOrder == sortTimestampDESC) {
if cfg.DataobjSortOrder != sortStreamASC && cfg.DataobjSortOrder != sortTimestampDESC {
errs = append(errs, fmt.Errorf("invalid dataobj sort order. must be one of `stream-asc` or `timestamp-desc`, got: %s", cfg.DataobjSortOrder))
}

Expand All @@ -138,7 +138,7 @@ var sortOrderMapping = map[string]logs.SortOrder{
}

func parseSortOrder(s string) logs.SortOrder {
val, _ := sortOrderMapping[s]
val := sortOrderMapping[s]
return val
}

Expand Down
202 changes: 202 additions & 0 deletions pkg/engine/internal/executor/compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package executor

import (
"cmp"
"context"
"slices"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"

"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
)

func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipeline) Pipeline {
const extracted = "_extracted"

return newGenericPipeline(Local, func(ctx context.Context, inputs []Pipeline) state {
input := inputs[0]
batch, err := input.Read(ctx)
if err != nil {
return failureState(err)
}
defer batch.Release()

// Return early if the batch has zero rows, even if column names would collide.
if batch.NumRows() == 0 {
batch.Retain() // retain to account for deferred release after reading the batch from the input
return successState(batch)
}

// First, find all fields in the schema that have colliding names,
// based on the collision column type and the source column type.
var (
collisionFieldIndices []int
collisionFieldNames []string
sourceFieldIndices []int
sourceFieldNames []string
)

schema := batch.Schema()
for idx := range schema.NumFields() {
ident, err := semconv.ParseFQN(schema.Field(idx).Name)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

release batch here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we return a new record in the happy path, can we defer batch.Release() on 22 instead? I really prefer the open/close pattern, and using defer to close right after we open in the code.

return failureState(err)
}
switch ident.ColumnType() {
case compat.Collision:
collisionFieldIndices = append(collisionFieldIndices, idx)
collisionFieldNames = append(collisionFieldNames, ident.ShortName())
case compat.Source:
sourceFieldIndices = append(sourceFieldIndices, idx)
sourceFieldNames = append(sourceFieldNames, ident.ShortName())
}
}

duplicates := findDuplicates(collisionFieldNames, sourceFieldNames)

// Return early if there are no colliding column names.
if len(duplicates) == 0 {
batch.Retain() // retain to account for deferred release after reading the batch from the input
return successState(batch)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh, I was wrong, we don't always create a new record. I wonder if we want to for cleaner release semantics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could still defer release on line 22, and explicitly retain here, as this is a short circuit.

}

// Next, update the schema with the new columns that have the _extracted suffix.
newSchema := batch.Schema()
duplicateCols := make([]duplicateColumn, 0, len(duplicates))
r := int(batch.NumCols())
for i, duplicate := range duplicates {
collisionFieldIdx := collisionFieldIndices[duplicate.s1Idx]
sourceFieldIdx := sourceFieldIndices[duplicate.s2Idx]

sourceField := newSchema.Field(sourceFieldIdx)
sourceIdent, err := semconv.ParseFQN(sourceField.Name)
if err != nil {
return failureState(err)
}

destinationIdent := semconv.NewIdentifier(sourceIdent.ShortName()+extracted, compat.Destination, sourceIdent.DataType())
newSchema, err = newSchema.AddField(len(newSchema.Fields()), semconv.FieldFromIdent(destinationIdent, true))
if err != nil {
return failureState(err)
}

duplicateCols = append(duplicateCols, duplicateColumn{
name: duplicate.value,
collisionIdx: collisionFieldIdx,
sourceIdx: sourceFieldIdx,
destinationIdx: r + i,
})
}

// Create a new builder with the updated schema.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need to create a new builder here since the contents of the arrays don't change for compatibility mapping.

Can we use the existing arrays and provide a new schema which handles the renaming via array.NewRecord? That would also be much faster than copying the data in the arrays.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I do with the columns that do not have conflicts.
However, I cannot just rename the full column of a batch, since we need to do that on a row basis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'm following yet. You can create a new *arrow.Schema where the field has the new resolved name, but give it the same underlying array (via array.NewRecord).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second this, there should be a way to reuse the column data and just rename the field in the schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 147 creates the new batch using array.NewRecord(newSchema, newSchemaColumns, batch.NumRows()), where newSchemaColums is the []arrow.Array that holds the existing unmodified columns (line 111) and the modified columns (line 137, line 141).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm having a hard time following from the code why we need to modify the columns? It reads as if it were just copying data, but I guess you're saying it's doing more than that?

Copy link
Member

@rfratto rfratto Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talking offline, it's clearer to me now that this is similar to a multi-in, multi-out coalesce operation.

I do find the logic here hard to follow and validate: it seems like sourceFieldBuilder never has a non-NULL appended to it, but I don't think that's true? I don't have any suggestions how to make this easier to understand, and I don't want to block us having this, so I'm comfortable with it being merged and coming back to this later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the need for the new column was finally made clear to me due to the fact that they columns won't always conflict (either by rows in the same batch or across batches). I think this case is worth a test, at least for documentation purposes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These cases are covered by tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do find the logic here hard to follow and validate: it seems like sourceFieldBuilder never has a non-NULL appended to it, but I don't think that's true?

Right, that is incorrect behaviour and will be fixed with
5b16cfb

// The per-field builders are only used for columns where row values are modified,
// otherwise the full column from the input record is copied into the new record.
builder := array.NewRecordBuilder(memory.DefaultAllocator, newSchema)
builder.Reserve(int(batch.NumRows()))
defer builder.Release()

newSchemaColumns := make([]arrow.Array, newSchema.NumFields())

// Now, go through all fields of the old schema and append the rows to the new builder.
for idx := range schema.NumFields() {
col := batch.Column(idx)

duplicateIdx := slices.IndexFunc(duplicateCols, func(d duplicateColumn) bool { return d.sourceIdx == idx })

// If not a colliding column, just copy over the column data of the original record.
if duplicateIdx < 0 {
newSchemaColumns[idx] = col
continue
}

// If the currently processed column is the source field for a colliding column,
// then write non-null values from source column into destination column.
// Also, "clear" the original column value by writing a NULL instead of the original value.
duplicate := duplicateCols[duplicateIdx]
collisionCol := batch.Column(duplicate.collisionIdx)

switch sourceFieldBuilder := builder.Field(idx).(type) {
case *array.StringBuilder:
destinationFieldBuilder := builder.Field(duplicate.destinationIdx).(*array.StringBuilder)
for i := range int(batch.NumRows()) {
if (col.IsNull(i) || !col.IsValid(i)) || (collisionCol.IsNull(i) || !collisionCol.IsValid(i)) {
sourceFieldBuilder.AppendNull() // append NULL to original column
destinationFieldBuilder.AppendNull() // append NULL to _extraced column
} else {
sourceFieldBuilder.AppendNull() // append NULL to original column
v := col.(*array.String).Value(i)
destinationFieldBuilder.Append(v) // append value to _extracted column
}
}

sourceCol := sourceFieldBuilder.NewArray()
defer sourceCol.Release()
newSchemaColumns[duplicate.sourceIdx] = sourceCol

destinationCol := destinationFieldBuilder.NewArray()
defer destinationCol.Release()
newSchemaColumns[duplicate.destinationIdx] = destinationCol
default:
panic("invalid source column type: only string columns can be checked for collisions")
}
}

rec := array.NewRecord(newSchema, newSchemaColumns, batch.NumRows())
return successState(rec)
}, input)
}

// duplicate holds indexes to a duplicate values in two slices
type duplicate struct {
value string
s1Idx, s2Idx int
}

// findDuplicates finds strings that appear in both slices and returns
// their indexes in each slice.
// The function assumes that elements in a slices are unique.
func findDuplicates(s1, s2 []string) []duplicate {
if len(s1) == 0 || len(s2) == 0 {
return nil
}

set1 := make(map[string]int)
for i, v := range s1 {
set1[v] = i
}

set2 := make(map[string]int)
for i, v := range s2 {
set2[v] = i
}

// Find duplicates that exist in both slices
var duplicates []duplicate
for value, s1Idx := range set1 {
if s2Idx, exists := set2[value]; exists {
duplicates = append(duplicates, duplicate{
value: value,
s1Idx: s1Idx,
s2Idx: s2Idx,
})
}
}

slices.SortStableFunc(duplicates, func(a, b duplicate) int { return cmp.Compare(a.value, b.value) })
return duplicates
}

// duplicateColumn holds indexes to fields/columns in an [*arrow.Schema].
type duplicateColumn struct {
// name is the duplicate column name
name string
// collisionIdx is the index of the collision column
collisionIdx int
// sourceIdx is the index of the source column
sourceIdx int
// destinationIdx is the index of the destination column
destinationIdx int
}
Loading