Skip to content

Commit 91ba46f

Browse files
Karthik Krishnaswamyethanyzhang
authored andcommitted
Add semantic warning logic for non-deterministic CTE reuse
1 parent ac1f4f7 commit 91ba46f

File tree

14 files changed

+361
-104
lines changed

14 files changed

+361
-104
lines changed

presto-client/src/main/java/com/facebook/presto/client/StatementClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
package com.facebook.presto.client;
1515

1616
import com.facebook.presto.common.type.TimeZoneKey;
17+
import com.facebook.presto.spi.PrestoWarning;
1718
import com.facebook.presto.spi.security.SelectedRole;
1819
import jakarta.annotation.Nullable;
1920

2021
import java.io.Closeable;
22+
import java.util.List;
2123
import java.util.Map;
2224
import java.util.Optional;
2325
import java.util.Set;
@@ -39,6 +41,8 @@ public interface StatementClient
3941

4042
StatementStats getStats();
4143

44+
List<PrestoWarning> getWarnings();
45+
4246
QueryStatusInfo currentStatusInfo();
4347

4448
QueryData currentData();

presto-client/src/main/java/com/facebook/presto/client/StatementClientV1.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
import com.facebook.airlift.units.Duration;
1818
import com.facebook.presto.client.OkHttpUtil.NullCallback;
1919
import com.facebook.presto.common.type.TimeZoneKey;
20+
import com.facebook.presto.spi.PrestoWarning;
2021
import com.facebook.presto.spi.security.SelectedRole;
2122
import com.google.common.base.Joiner;
2223
import com.google.common.base.Splitter;
24+
import com.google.common.collect.ImmutableList;
2325
import com.google.common.collect.ImmutableMap;
2426
import com.google.common.collect.ImmutableSet;
2527
import com.google.errorprone.annotations.ThreadSafe;
@@ -97,6 +99,7 @@ class StatementClientV1
9799

98100
private final OkHttpClient httpClient;
99101
private final String query;
102+
private List<PrestoWarning> warnings = ImmutableList.of();
100103
private final AtomicReference<QueryResults> currentResults = new AtomicReference<>();
101104
private final AtomicReference<String> setCatalog = new AtomicReference<>();
102105
private final AtomicReference<String> setSchema = new AtomicReference<>();
@@ -133,7 +136,9 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String
133136

134137
Request request = buildQueryRequest(session, query);
135138

136-
JsonResponse<QueryResults> response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);
139+
JsonResponse<com.facebook.presto.client.QueryResults> response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);
140+
System.err.println("RAW RESPONSE BODY:");
141+
System.err.println(response.getResponseBody());
137142
if ((response.getStatusCode() != HTTP_OK) || !response.hasValue()) {
138143
state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);
139144
throw requestFailedException("starting query", request, response);
@@ -491,6 +496,10 @@ private void processResponse(Headers headers, QueryResults results)
491496
}
492497

493498
currentResults.set(results);
499+
results = currentResults.get();
500+
if (results != null && results.getWarnings() != null) {
501+
this.warnings = results.getWarnings();
502+
}
494503
}
495504

496505
private RuntimeException requestFailedException(String task, Request request, JsonResponse<QueryResults> response)
@@ -525,6 +534,11 @@ public void cancelLeafStage()
525534
httpDelete(uri);
526535
}
527536
}
537+
@Override
538+
public List<PrestoWarning> getWarnings()
539+
{
540+
return warnings;
541+
}
528542

529543
@Override
530544
public void close()

presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,7 @@
4646
import java.net.URI;
4747
import java.nio.file.Path;
4848
import java.nio.file.Paths;
49-
import java.util.Collections;
50-
import java.util.HashMap;
51-
import java.util.List;
52-
import java.util.Map;
53-
import java.util.Optional;
49+
import java.util.*;
5450
import java.util.function.BiFunction;
5551

5652
import static com.facebook.airlift.log.Level.ERROR;
@@ -464,6 +460,8 @@ public static Session createMaterializeExchangesSession(Optional<SelectedRole> r
464460
public static void main(String[] args)
465461
throws Exception
466462
{
463+
TimeZone.setDefault(TimeZone.getTimeZone("America/Bahia_Banderas"));
464+
System.out.println("JVM timezone = " + java.util.TimeZone.getDefault().getID());
467465
// You need to add "--user user" to your CLI for your queries to work
468466
setupLogging();
469467
Optional<Path> dataDirectory;

presto-hive/src/test/java/com/facebook/presto/hive/HiveTestUtils.java

Lines changed: 268 additions & 54 deletions
Large diffs are not rendered by default.

presto-main-base/src/main/java/com/facebook/presto/execution/QueryStateMachine.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@
2929
import com.facebook.presto.metadata.Metadata;
3030
import com.facebook.presto.server.BasicQueryInfo;
3131
import com.facebook.presto.server.BasicQueryStats;
32-
import com.facebook.presto.spi.PrestoException;
33-
import com.facebook.presto.spi.QueryId;
34-
import com.facebook.presto.spi.SchemaTableName;
35-
import com.facebook.presto.spi.WarningCollector;
32+
import com.facebook.presto.spi.*;
3633
import com.facebook.presto.spi.connector.ConnectorCommitHandle;
3734
import com.facebook.presto.spi.function.SqlFunctionId;
3835
import com.facebook.presto.spi.function.SqlInvokedFunction;
@@ -293,6 +290,10 @@ public Session getSession()
293290
{
294291
return session;
295292
}
293+
public List<PrestoWarning> getWarnings()
294+
{
295+
return warningCollector.getWarnings();
296+
}
296297

297298
public long getPeakUserMemoryInBytes()
298299
{
@@ -1359,6 +1360,9 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
13591360
queryStats.getRuntimeStats());
13601361
}
13611362

1363+
public void addWarnings(List<PrestoWarning> warnings) {
1364+
}
1365+
13621366
public static class QueryOutputManager
13631367
{
13641368
private final Executor executor;

presto-main-base/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,7 @@ private PlanRoot doCreateLogicalPlanAndOptimize()
617617
SubPlan fragmentedPlan = getSession().getRuntimeStats().recordWallAndCpuTime(
618618
FRAGMENT_PLAN_TIME_NANOS,
619619
() -> planFragmenter.createSubPlans(stateMachine.getSession(), plan, false, idAllocator, variableAllocator.get(), stateMachine.getWarningCollector()));
620+
stateMachine.addWarnings(stateMachine.getWarningCollector().getWarnings());
620621

621622
// record analysis time
622623
stateMachine.endAnalysis();

presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/Analyzer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public Analysis analyzeSemantic(Statement statement, boolean isDescribe)
117117
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup, isDescribe);
118118

119119
metadataExtractor.populateMetadataHandle(session, rewrittenStatement, analysis.getMetadataHandle());
120+
FeaturesConfig featuresConfig = new FeaturesConfig();
120121
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session, warningCollector);
121122
analyzer.analyze(rewrittenStatement, Optional.empty());
122123
analyzeForUtilizedColumns(analysis, analysis.getStatement(), warningCollector);

presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.facebook.presto.common.predicate.TupleDomain;
2424
import com.facebook.presto.common.type.ArrayType;
2525
import com.facebook.presto.common.type.BigintType;
26+
import com.facebook.presto.spi.*;
2627
import com.facebook.presto.common.type.DoubleType;
2728
import com.facebook.presto.common.type.MapType;
2829
import com.facebook.presto.common.type.RealType;
@@ -33,15 +34,7 @@
3334
import com.facebook.presto.common.type.VarcharType;
3435
import com.facebook.presto.metadata.Metadata;
3536
import com.facebook.presto.metadata.OperatorNotFoundException;
36-
import com.facebook.presto.spi.ColumnHandle;
37-
import com.facebook.presto.spi.ColumnMetadata;
38-
import com.facebook.presto.spi.MaterializedViewDefinition;
39-
import com.facebook.presto.spi.MaterializedViewStatus;
40-
import com.facebook.presto.spi.PrestoException;
4137
import com.facebook.presto.spi.PrestoWarning;
42-
import com.facebook.presto.spi.SchemaTableName;
43-
import com.facebook.presto.spi.TableHandle;
44-
import com.facebook.presto.spi.WarningCollector;
4538
import com.facebook.presto.spi.analyzer.AccessControlInfo;
4639
import com.facebook.presto.spi.analyzer.AccessControlInfoForTable;
4740
import com.facebook.presto.spi.analyzer.MetadataResolver;
@@ -1194,26 +1187,36 @@ protected Scope visitExplain(Explain node, Optional<Scope> scope)
11941187
analysis.setUpdateType(null);
11951188
return createAndAssignScope(node, scope, Field.newUnqualified(node.getLocation(), "Query Plan", VARCHAR));
11961189
}
1197-
11981190
@Override
11991191
protected Scope visitQuery(Query node, Optional<Scope> scope)
12001192
{
12011193
Scope withScope = analyzeWith(node, scope);
12021194
Scope queryBodyScope = process(node.getQueryBody(), withScope);
1203-
List<Expression> orderByExpressions = emptyList();
1204-
if (node.getOrderBy().isPresent()) {
1205-
orderByExpressions = analyzeOrderBy(node, getSortItemsFromOrderBy(node.getOrderBy()), queryBodyScope);
1206-
if (queryBodyScope.getOuterQueryParent().isPresent() && !node.getLimit().isPresent()) {
1207-
// not the root scope and ORDER BY is ineffective
1208-
analysis.markRedundantOrderBy(node.getOrderBy().get());
1209-
warningCollector.add(new PrestoWarning(REDUNDANT_ORDER_BY, "ORDER BY in subquery may have no effect"));
1195+
1196+
1197+
// 🔔 Insert warning logic here
1198+
if (node.getWith().isPresent()) {
1199+
for (WithQuery withQuery : node.getWith().get().getQueries()) {
1200+
if (isNonDeterministic(withQuery.getQuery())) {
1201+
WarningCode code = new WarningCode(1234, "NON_DETERMINISTIC_CTE");
1202+
PrestoWarning warning = new PrestoWarning(code, "CTE reuse may lead to non-deterministic results");
1203+
warningCollector.add(warning);
1204+
}
12101205
}
12111206
}
1207+
List<Expression> orderByExpressions = node.getOrderBy()
1208+
.map(OrderBy::getSortItems)
1209+
.orElse(ImmutableList.of())
1210+
.stream()
1211+
.map(SortItem::getSortKey)
1212+
.collect(toImmutableList());
1213+
12121214
analysis.setOrderByExpressions(node, orderByExpressions);
12131215

12141216
if (node.getOffset().isPresent()) {
12151217
analyzeOffset(node.getOffset().get());
12161218
}
1219+
12171220
// Input fields == Output fields
12181221
analysis.setOutputExpressions(node, descriptorToFields(queryBodyScope));
12191222

@@ -1226,6 +1229,11 @@ protected Scope visitQuery(Query node, Optional<Scope> scope)
12261229
return queryScope;
12271230
}
12281231

1232+
private boolean isNonDeterministic(Query query) {
1233+
// Naive check — refine later if needed
1234+
return query.toString().contains("rand()") || query.toString().contains("now()");
1235+
}
1236+
12291237
@Override
12301238
protected Scope visitUnnest(Unnest node, Optional<Scope> scope)
12311239
{
@@ -3243,11 +3251,6 @@ private List<Expression> analyzeOrderBy(Node node, List<SortItem> sortItems, Sco
32433251
return orderByFields;
32443252
}
32453253

3246-
private Scope createAndAssignScope(Node node, Optional<Scope> parentScope)
3247-
{
3248-
return createAndAssignScope(node, parentScope, emptyList());
3249-
}
3250-
32513254
private Scope createAndAssignScope(Node node, Optional<Scope> parentScope, Field... fields)
32523255
{
32533256
return createAndAssignScope(node, parentScope, new RelationType(fields));

presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/AbstractAnalyzerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,9 +464,10 @@ private void inSetupTransaction(Consumer<Session> consumer)
464464
.execute(SETUP_SESSION, consumer);
465465
}
466466

467-
protected void analyze(@Language("SQL") String query)
467+
protected Analysis analyze(@Language("SQL") String query)
468468
{
469469
analyze(CLIENT_SESSION, query);
470+
return null;
470471
}
471472

472473
protected WarningCollector analyzeWithWarnings(@Language("SQL") String query)

presto-main/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@
283283
<groupId>io.projectreactor.netty</groupId>
284284
<artifactId>reactor-netty-http</artifactId>
285285
</dependency>
286-
286+
287287
<dependency>
288288
<groupId>io.micrometer</groupId>
289289
<artifactId>micrometer-core</artifactId>

0 commit comments

Comments
 (0)