Skip to content

Commit bda3a3b

Browse files
committed
chore: Implement ColumnCompat pipeline stage
Signed-off-by: Christian Haudum <[email protected]>
1 parent 6a34a9d commit bda3a3b

File tree

6 files changed

+875
-13
lines changed

6 files changed

+875
-13
lines changed
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package executor
2+
3+
import (
4+
"cmp"
5+
"context"
6+
"slices"
7+
8+
_ "github.com/apache/arrow-go/v18/arrow"
9+
"github.com/apache/arrow-go/v18/arrow/array"
10+
"github.com/apache/arrow-go/v18/arrow/memory"
11+
12+
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
13+
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
14+
)
15+
16+
func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipeline) Pipeline {
17+
const extracted = "_extracted"
18+
19+
return newGenericPipeline(Local, func(ctx context.Context, inputs []Pipeline) state {
20+
input := inputs[0]
21+
batch, err := input.Read(ctx)
22+
if err != nil {
23+
return failureState(err)
24+
}
25+
26+
// Return early if the batch has zero rows, even if column names would collide.
27+
if batch.NumRows() == 0 {
28+
return successState(batch)
29+
}
30+
31+
// First, find all fields in the schema that have colliding names,
32+
// based on the collision column type and the source column type.
33+
var (
34+
collisionFieldIndices []int
35+
collisionFieldNames []string
36+
sourceFieldIndices []int
37+
sourceFieldNames []string
38+
)
39+
40+
schema := batch.Schema()
41+
for idx := range schema.NumFields() {
42+
ident, err := semconv.ParseFQN(schema.Field(idx).Name)
43+
if err != nil {
44+
return failureState(err)
45+
}
46+
switch ident.ColumnType() {
47+
case compat.Collision:
48+
collisionFieldIndices = append(collisionFieldIndices, idx)
49+
collisionFieldNames = append(collisionFieldNames, ident.ShortName())
50+
case compat.Source:
51+
sourceFieldIndices = append(sourceFieldIndices, idx)
52+
sourceFieldNames = append(sourceFieldNames, ident.ShortName())
53+
}
54+
}
55+
56+
duplicates := findDuplicates(collisionFieldNames, sourceFieldNames)
57+
58+
// Return early if there are no colliding column names.
59+
if len(duplicates) == 0 {
60+
return successState(batch)
61+
}
62+
63+
// Release batch, since it is not passed to the caller.
64+
defer batch.Release()
65+
66+
// Next, update the schema with the new columns that have the _extracted suffix.
67+
newSchema := batch.Schema()
68+
duplicateCols := make([]duplicateColumn, 0, len(duplicates))
69+
r := int(batch.NumCols())
70+
for i, duplicate := range duplicates {
71+
collisionFieldIdx := collisionFieldIndices[duplicate.s1Idx]
72+
sourceFieldIdx := sourceFieldIndices[duplicate.s2Idx]
73+
74+
sourceField := newSchema.Field(sourceFieldIdx)
75+
sourceIdent, err := semconv.ParseFQN(sourceField.Name)
76+
if err != nil {
77+
return failureState(err)
78+
}
79+
80+
destinationIdent := semconv.NewIdentifier(sourceIdent.ShortName()+extracted, compat.Destination, sourceIdent.DataType())
81+
newSchema, err = newSchema.AddField(len(newSchema.Fields()), semconv.FieldFromIdent(destinationIdent, true))
82+
if err != nil {
83+
return failureState(err)
84+
}
85+
86+
duplicateCols = append(duplicateCols, duplicateColumn{
87+
name: duplicate.value,
88+
collisionIdx: collisionFieldIdx,
89+
sourceIdx: sourceFieldIdx,
90+
destinationIdx: r + i,
91+
})
92+
}
93+
94+
// Create a new builder with the updated schema.
95+
builder := array.NewRecordBuilder(memory.DefaultAllocator, newSchema)
96+
builder.Reserve(int(batch.NumRows()))
97+
defer builder.Release()
98+
99+
// Now, go through all fields of the old schema and append the rows to the new builder.
100+
for idx := range schema.NumFields() {
101+
col := batch.Column(idx)
102+
fieldBuilder := builder.Field(idx)
103+
104+
duplicateIdx := slices.IndexFunc(duplicateCols, func(d duplicateColumn) bool { return d.sourceIdx == idx })
105+
106+
// If not a colliding column, just copy over the column data of the original record.
107+
// I could not find a "batch copy" function, which I guess would be much more efficient.
108+
//
109+
// TODO(chaudum): Simplify
110+
if duplicateIdx < 0 {
111+
switch b := fieldBuilder.(type) {
112+
case *array.StringBuilder:
113+
for i := range int(batch.NumRows()) {
114+
if col.IsNull(i) || !col.IsValid(i) {
115+
b.AppendNull()
116+
} else {
117+
v := col.(*array.String).Value(i)
118+
b.Append(v)
119+
}
120+
}
121+
case *array.TimestampBuilder:
122+
for i := range int(batch.NumRows()) {
123+
if col.IsNull(i) || !col.IsValid(i) {
124+
b.AppendNull()
125+
} else {
126+
v := col.(*array.Timestamp).Value(i)
127+
b.Append(v)
128+
}
129+
}
130+
case *array.Float64Builder:
131+
for i := range int(batch.NumRows()) {
132+
if col.IsNull(i) || !col.IsValid(i) {
133+
b.AppendNull()
134+
} else {
135+
v := col.(*array.Float64).Value(i)
136+
b.Append(v)
137+
}
138+
}
139+
case *array.Int64Builder:
140+
for i := range int(batch.NumRows()) {
141+
if col.IsNull(i) || !col.IsValid(i) {
142+
b.AppendNull()
143+
} else {
144+
v := col.(*array.Int64).Value(i)
145+
b.Append(v)
146+
}
147+
}
148+
}
149+
continue
150+
}
151+
152+
// If the currently processed column is the source field for a colliding column,
153+
// then write non-null values from source column into destination column.
154+
// Also, "clear" the original column value by writing a NULL instead of the original value.
155+
duplicate := duplicateCols[duplicateIdx]
156+
collisionCol := batch.Column(duplicate.collisionIdx)
157+
158+
switch b := fieldBuilder.(type) {
159+
case *array.StringBuilder:
160+
newFieldBuilder := builder.Field(duplicate.destinationIdx).(*array.StringBuilder)
161+
for i := range int(batch.NumRows()) {
162+
if (col.IsNull(i) || !col.IsValid(i)) || (collisionCol.IsNull(i) || !collisionCol.IsValid(i)) {
163+
b.AppendNull() // append NULL to original column
164+
newFieldBuilder.AppendNull() // append NULL to _extraced column
165+
} else {
166+
b.AppendNull() // append NULL to original column
167+
v := col.(*array.String).Value(i)
168+
newFieldBuilder.Append(v) // append value to _extracted column
169+
}
170+
}
171+
default:
172+
panic("invalid column type: only string columns can be checked for collisions")
173+
}
174+
}
175+
176+
return successState(builder.NewRecord())
177+
}, input)
178+
}
179+
180+
// duplicate holds indexes to a duplicate values in two slices
181+
type duplicate struct {
182+
value string
183+
s1Idx, s2Idx int
184+
}
185+
186+
// findDuplicates finds strings that appear in both slices and returns
187+
// their indexes in each slice.
188+
// The function assumes that elements in a slices are unique.
189+
func findDuplicates(s1, s2 []string) []duplicate {
190+
if len(s1) == 0 || len(s2) == 0 {
191+
return nil
192+
}
193+
194+
set1 := make(map[string]int)
195+
for i, v := range s1 {
196+
set1[v] = i
197+
}
198+
199+
set2 := make(map[string]int)
200+
for i, v := range s2 {
201+
set2[v] = i
202+
}
203+
204+
// Find duplicates that exist in both slices
205+
var duplicates []duplicate
206+
for value, s1Idx := range set1 {
207+
if s2Idx, exists := set2[value]; exists {
208+
duplicates = append(duplicates, duplicate{
209+
value: value,
210+
s1Idx: s1Idx,
211+
s2Idx: s2Idx,
212+
})
213+
}
214+
}
215+
216+
slices.SortStableFunc(duplicates, func(a, b duplicate) int { return cmp.Compare(a.value, b.value) })
217+
return duplicates
218+
}
219+
220+
// duplicateColumn holds indexes to fields/columns in an [*arrow.Schema].
221+
type duplicateColumn struct {
222+
// name is the duplicate column name
223+
name string
224+
// collisionIdx is the index of the collision column
225+
collisionIdx int
226+
// sourceIdx is the index of the source column
227+
sourceIdx int
228+
// destinationIdx is the index of the destination column
229+
destinationIdx int
230+
}

0 commit comments

Comments
 (0)