|
16 | 16 |
|
17 | 17 | package com.google.cloud.firestore;
|
18 | 18 |
|
| 19 | +import static com.google.cloud.firestore.pipeline.expressions.Expr.and; |
19 | 20 | import static com.google.cloud.firestore.telemetry.TraceUtil.ATTRIBUTE_KEY_ATTEMPT;
|
20 | 21 | import static com.google.cloud.firestore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;
|
21 | 22 |
|
|
28 | 29 | import com.google.api.gax.rpc.StatusCode;
|
29 | 30 | import com.google.api.gax.rpc.StreamController;
|
30 | 31 | import com.google.cloud.Timestamp;
|
31 |
| -import com.google.cloud.firestore.pipeline.expressions.ExprWithAlias; |
| 32 | +import com.google.cloud.firestore.pipeline.expressions.AliasedAggregate; |
| 33 | +import com.google.cloud.firestore.pipeline.expressions.BooleanExpr; |
32 | 34 | import com.google.cloud.firestore.telemetry.TraceUtil;
|
33 | 35 | import com.google.cloud.firestore.telemetry.TraceUtil.Scope;
|
34 | 36 | import com.google.cloud.firestore.v1.FirestoreSettings;
|
|
49 | 51 | import java.util.Map;
|
50 | 52 | import java.util.Objects;
|
51 | 53 | import java.util.Set;
|
| 54 | +import java.util.stream.Collectors; |
52 | 55 | import javax.annotation.Nonnull;
|
53 | 56 | import javax.annotation.Nullable;
|
54 | 57 |
|
@@ -81,12 +84,27 @@ public Query getQuery() {
|
81 | 84 | @Nonnull
|
82 | 85 | @BetaApi
|
83 | 86 | public Pipeline pipeline() {
|
84 |
| - return getQuery() |
85 |
| - .pipeline() |
86 |
| - .aggregate( |
87 |
| - this.aggregateFieldList.stream() |
88 |
| - .map(PipelineUtils::toPipelineAggregatorTarget) |
89 |
| - .toArray(ExprWithAlias[]::new)); |
| 87 | + Pipeline pipeline = getQuery().pipeline(); |
| 88 | + |
| 89 | + List<BooleanExpr> existsExprs = |
| 90 | + this.aggregateFieldList.stream() |
| 91 | + .map(PipelineUtils::toPipelineExistsExpr) |
| 92 | + .filter(Objects::nonNull) |
| 93 | + .collect(Collectors.toList()); |
| 94 | + if (existsExprs.size() == 1) { |
| 95 | + pipeline = pipeline.where(existsExprs.get(0)); |
| 96 | + } else if (existsExprs.size() > 1) { |
| 97 | + pipeline = |
| 98 | + pipeline.where( |
| 99 | + and( |
| 100 | + existsExprs.get(0), |
| 101 | + existsExprs.subList(1, existsExprs.size()).toArray(new BooleanExpr[0]))); |
| 102 | + } |
| 103 | + |
| 104 | + return pipeline.aggregate( |
| 105 | + this.aggregateFieldList.stream() |
| 106 | + .map(PipelineUtils::toPipelineAggregatorTarget) |
| 107 | + .toArray(AliasedAggregate[]::new)); |
90 | 108 | }
|
91 | 109 |
|
92 | 110 | /**
|
|
0 commit comments