Skip to content

Commit 5437a88

Browse files
committed
Fix testDirectTrinoClientLongQuery
Record heartbeats more frequently to prevent query timeouts during large result sets or when the query state changes more often than the heartbeat interval.
1 parent b0738cb commit 5437a88

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

core/trino-main/src/main/java/io/trino/client/direct/DirectTrinoClient.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ public DispatchQuery execute(SessionContext sessionContext, @Language("SQL") Str
110110
!(dispatchQuery.getState() == FINISHING && dispatchQuery.getFullQueryInfo().getStages().isEmpty());
111111
state = queryManager.getQueryState(queryId)) {
112112
for (Slice serializedPage = exchangeClient.pollPage(); serializedPage != null; serializedPage = exchangeClient.pollPage()) {
113+
// record heartbeat for each page to avoid query timeout during large result sets
114+
dispatchQuery.recordHeartbeat();
113115
Page page = pageDeserializer.deserialize(serializedPage);
114116
queryResultsListener.consumeOutputPage(page);
115117
}
@@ -148,9 +150,7 @@ private void getQueryFutureWithHeartbeats(ListenableFuture<Object> anyCompleteFu
148150
anyCompleteFuture.get(heartBeatIntervalMillis, TimeUnit.MILLISECONDS);
149151
}
150152
catch (TimeoutException e) {
151-
// continue waiting until the query state changes or the exchange client is blocked.
152-
// we need to periodically record the heartbeat to prevent the query from being canceled
153-
dispatchQuery.recordHeartbeat();
153+
// ignore, heartbeat will be recorded in finally block
154154
}
155155
catch (InterruptedException e) {
156156
Thread.currentThread().interrupt();
@@ -159,6 +159,11 @@ private void getQueryFutureWithHeartbeats(ListenableFuture<Object> anyCompleteFu
159159
catch (ExecutionException e) {
160160
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Error processing query", e.getCause());
161161
}
162+
finally {
163+
// continue waiting until the query state changes or the exchange client is blocked.
164+
// we need to periodically record the heartbeat to prevent the query from being canceled
165+
dispatchQuery.recordHeartbeat();
166+
}
162167
}
163168
}
164169

0 commit comments

Comments
 (0)