Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1002,12 +1002,60 @@ private Pair<List<RexNode>, List<AggCall>> aggregateWithTrimming(
Pair<List<RexNode>, List<AggCall>> reResolved =
resolveAttributesForAggregation(groupExprList, aggExprList, context);

List<String> intendedGroupKeyAliases = getGroupKeyNamesAfterAggregation(reResolved.getLeft());
context.relBuilder.aggregate(
context.relBuilder.groupKey(reResolved.getLeft()), reResolved.getRight());
// During aggregation, Calcite projects both input dependencies and output group-by fields.
// When names conflict, Calcite adds numeric suffixes (e.g., "value0").
// Apply explicit renaming to restore the intended aliases.
context.relBuilder.rename(intendedGroupKeyAliases);

return Pair.of(reResolved.getLeft(), reResolved.getRight());
}

/**
* Imitates {@code Registrar.registerExpression} of {@link RelBuilder} to derive the output order
* of group-by keys after aggregation.
*
* <p>The projected input reference comes first, while any other computed expression follows.
*/
private List<String> getGroupKeyNamesAfterAggregation(List<RexNode> nodes) {
List<RexNode> reordered = new ArrayList<>();
List<RexNode> left = new ArrayList<>();
for (RexNode n : nodes) {
// The same group-key won't be added twice
if (reordered.contains(n) || left.contains(n)) {
continue;
}
if (isInputRef(n)) {
reordered.add(n);
} else {
left.add(n);
}
}
reordered.addAll(left);
return reordered.stream()
.map(this::extractAliasLiteral)
.flatMap(Optional::stream)
.map(RexLiteral::stringValue)
.collect(Collectors.toList());
}

/** Whether a rex node is an aliased input reference */
private boolean isInputRef(RexNode node) {
switch (node.getKind()) {
case AS:
case DESCENDING:
case NULLS_FIRST:
case NULLS_LAST: {
final List<RexNode> operands = ((RexCall) node).operands;
return isInputRef(operands.get(0));
}
default:
return node instanceof RexInputRef;
}
}

/**
* Resolve attributes for aggregation.
*
Expand Down Expand Up @@ -1106,7 +1154,7 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
aggregationAttributes.getLeft().stream()
.map(this::extractAliasLiteral)
.flatMap(Optional::stream)
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
.map(ref -> ref.getValueAs(String.class))
.map(context.relBuilder::field)
.map(f -> (RexNode) f)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
setup:
- do:
indices.create:
index: time_test
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : true

- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "time_test"}}'
- '{"category":"A","value":1000,"@timestamp":"2024-01-01T00:00:00Z"}'
- '{"index": {"_index": "time_test"}}'
- '{"category":"B","value":2000,"@timestamp":"2024-01-01T00:05:00Z"}'
- '{"index": {"_index": "time_test"}}'
- '{"category":"A","value":1500,"@timestamp":"2024-01-01T00:10:00Z"}'
- '{"index": {"_index": "time_test"}}'
- '{"category":"C","value":3000,"@timestamp":"2024-01-01T00:20:00Z"}'

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false

---
"Test span aggregation with field name collision - basic case":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=time_test | stats count() by span(value, 1000) as value

- match: { total: 3 }
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "value", "type": "bigint"}] }
- match: { datarows: [[2, 1000], [1, 2000], [1, 3000]] }

---
"Test span aggregation with field name collision - multiple aggregations":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=time_test | stats count(), avg(value) by span(value, 1000) as value

- match: { total: 3 }
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "avg(value)", "type": "double"}, {"name": "value", "type": "bigint"}] }
- match: { datarows: [[2, 1250.0, 1000], [1, 2000.0, 2000], [1, 3000.0, 3000]] }

---
"Test span aggregation without name collision - multiple group-by":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=time_test | stats count() by span(@timestamp, 10min) as @timestamp, category, value

- match: { total: 4 }
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "@timestamp", "type": "timestamp"}, {"name": "category", "type": "string"}, {"name": "value", "type": "bigint"}] }
- match: { datarows: [[1, "2024-01-01 00:00:00", "A", 1000], [1, "2024-01-01 00:10:00", "A", 1500], [1, "2024-01-01 00:00:00", "B", 2000], [1, "2024-01-01 00:20:00", "C", 3000]] }

---
"Test span aggregation with duplicated group keys":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=time_test | stats count() by value, value, span(@timestamp, 10min) as @timestamp

- match: { total: 4 }
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "@timestamp", "type": "timestamp"}, {"name": "value", "type": "bigint"}, {"name": "value0", "type": "bigint"}] }
- match: { datarows: [[1, "2024-01-01 00:00:00", 1000, 1000], [1, "2024-01-01 00:10:00", 1500, 1500], [1, "2024-01-01 00:00:00", 2000, 2000], [1, "2024-01-01 00:20:00", 3000, 3000]] }
Loading