Skip to content

Commit 5d51b1b

Browse files
[AINode] Move ainode cache to AINodeClient
1 parent fa53381 commit 5d51b1b

File tree

5 files changed

+196
-32
lines changed

5 files changed

+196
-32
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
2323
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
24+
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2425
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
2526
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
2627
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
@@ -150,6 +151,8 @@
150151
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
151152
import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
152153
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
154+
import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationReq;
155+
import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp;
153156
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
154157
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
155158
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@@ -648,6 +651,59 @@ public TSStatus operatePermission(final TAuthorizerReq req) {
648651
req.getNewUsername()));
649652
}
650653

654+
@Override
655+
public TGetAINodeLocationResp getAINodeLocation(final TGetAINodeLocationReq req)
656+
throws TException {
657+
final TGetAINodeLocationResp resp = new TGetAINodeLocationResp();
658+
final TSStatus status = new TSStatus();
659+
try {
660+
final java.util.List<?> registered = configManager.getNodeManager().getRegisteredAINodes();
661+
if (registered == null || registered.isEmpty()) {
662+
status.setCode(TSStatusCode.AI_NODE_INTERNAL_ERROR.getStatusCode());
663+
status.setMessage("No registered AINode found");
664+
resp.setStatus(status);
665+
return resp;
666+
}
667+
final java.util.Optional<TEndPoint> picked = pickAnyEndPointFromRegistered(registered);
668+
if (picked.isPresent()) {
669+
resp.setAiNodeAddress(picked.get());
670+
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
671+
} else {
672+
status.setCode(TSStatusCode.AI_NODE_INTERNAL_ERROR.getStatusCode());
673+
status.setMessage("No valid AINode endpoint extracted from registry");
674+
}
675+
} catch (Exception e) {
676+
status.setCode(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
677+
status.setMessage("getAINodeLocation failed: " + e.getMessage());
678+
}
679+
resp.setStatus(status);
680+
return resp;
681+
}
682+
683+
private java.util.Optional<TEndPoint> pickAnyEndPointFromRegistered(
684+
final java.util.List<?> registered) {
685+
for (Object v : registered) {
686+
if (v instanceof TEndPoint) {
687+
return java.util.Optional.of((TEndPoint) v);
688+
}
689+
try {
690+
final Object ep = v.getClass().getMethod("getRpcEndPoint").invoke(v);
691+
if (ep instanceof TEndPoint) {
692+
return java.util.Optional.of((TEndPoint) ep);
693+
}
694+
} catch (ReflectiveOperationException ignore) {
695+
}
696+
try {
697+
final Object ep = v.getClass().getMethod("getClientRpcEndPoint").invoke(v);
698+
if (ep instanceof TEndPoint) {
699+
return java.util.Optional.of((TEndPoint) ep);
700+
}
701+
} catch (ReflectiveOperationException ignore) {
702+
}
703+
}
704+
return java.util.Optional.empty();
705+
}
706+
651707
@Override
652708
public TAuthorizerResp queryPermission(final TAuthorizerReq req) {
653709
final PermissionInfoResp dataSet =

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@
110110
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
111111
import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
112112
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
113+
import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationReq;
114+
import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp;
113115
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
114116
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
115117
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@@ -529,6 +531,11 @@ public TAINodeRestartResp restartAINode(TAINodeRestartReq req) throws TException
529531
throw new UnsupportedOperationException(UNSUPPORTED_INVOCATION);
530532
}
531533

534+
public TGetAINodeLocationResp getAINodeLocation(final TGetAINodeLocationReq req)
535+
throws org.apache.thrift.TException {
536+
return client.getAINodeLocation(req);
537+
}
538+
532539
@Override
533540
public TSStatus removeAINode(TAINodeRemoveReq req) throws TException {
534541
return executeRemoteCallWithRetry(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -364,10 +364,6 @@
364364
import java.util.Optional;
365365
import java.util.Set;
366366
import java.util.TreeMap;
367-
import java.util.concurrent.TimeUnit;
368-
import java.util.concurrent.atomic.AtomicLong;
369-
import java.util.concurrent.atomic.AtomicReference;
370-
import java.util.concurrent.locks.ReentrantLock;
371367
import java.util.function.Predicate;
372368
import java.util.stream.Collectors;
373369

@@ -380,25 +376,11 @@
380376

381377
public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
382378

383-
private static final AtomicReference<TEndPoint> CACHED_AINODE = new AtomicReference<>();
384-
private static final ReentrantLock AINODE_REFRESH_LOCK = new ReentrantLock();
385-
private static final long AINODE_TTL_NANOS = TimeUnit.SECONDS.toNanos(3);
386-
private static final AtomicLong AINODE_LAST_REFRESH = new AtomicLong(0L);
387-
388-
/** Return cached AINode if fresh; otherwise refresh from ConfigNode using modelId. */
389-
private TEndPoint resolveAINodeEp(String modelIdOrNull) {
390-
final TEndPoint ep = CACHED_AINODE.get();
391-
if (ep != null && System.nanoTime() - AINODE_LAST_REFRESH.get() < AINODE_TTL_NANOS) {
392-
return ep;
393-
}
394-
return refreshAINodeFromConfigNode(modelIdOrNull);
395-
}
379+
// NOTE: AINode location is now maintained globally inside AINodeClient.
380+
// We only resolve via ConfigNode when needed, then publish it back to AINodeClient.
396381

397382
/** Ask ConfigNode for the latest AINode location (precise by modelId when available). */
398-
private TEndPoint refreshAINodeFromConfigNode(String modelIdOrNull) {
399-
if (!AINODE_REFRESH_LOCK.tryLock()) {
400-
return CACHED_AINODE.get();
401-
}
383+
private TEndPoint resolveViaConfigNodeAndPublish(String modelIdOrNull) {
402384
try (ConfigNodeClient cn =
403385
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
404386
TEndPoint ep = null;
@@ -407,17 +389,24 @@ private TEndPoint refreshAINodeFromConfigNode(String modelIdOrNull) {
407389
if (resp != null && resp.isSetAiNodeAddress()) {
408390
ep = resp.getAiNodeAddress();
409391
}
392+
} else {
393+
final org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp r =
394+
cn.getAINodeLocation(
395+
new org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationReq());
396+
if (r != null
397+
&& r.getStatus() != null
398+
&& r.getStatus().getCode()
399+
== org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS.getStatusCode()
400+
&& r.isSetAiNodeAddress()) {
401+
ep = r.getAiNodeAddress();
402+
}
410403
}
411-
// if no modelId or location return,use showAIDevices to fetch an AINode
412404
if (ep != null) {
413-
CACHED_AINODE.set(ep);
414-
AINODE_LAST_REFRESH.set(System.nanoTime());
405+
AINodeClient.updateGlobalAINodeLocation(ep);
415406
}
416-
return CACHED_AINODE.get();
407+
return ep;
417408
} catch (Exception e) {
418-
return CACHED_AINODE.get();
419-
} finally {
420-
AINODE_REFRESH_LOCK.unlock();
409+
return null;
421410
}
422411
}
423412

@@ -3658,7 +3647,7 @@ public SettableFuture<ConfigTaskResult> loadModel(
36583647
String existingModelId, List<String> deviceIdList) {
36593648
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
36603649
// 1) Try direct DataNode → AINode with cached endpoint
3661-
TEndPoint ep = resolveAINodeEp(existingModelId);
3650+
TEndPoint ep = AINodeClient.getCurrentEndpoint();
36623651
try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(ep)) {
36633652
final TLoadModelReq req = new TLoadModelReq(existingModelId, deviceIdList);
36643653
final TSStatus result = ai.loadModel(req);
@@ -3669,7 +3658,7 @@ public SettableFuture<ConfigTaskResult> loadModel(
36693658
}
36703659
} catch (final Exception first) {
36713660
// 2) Fallback: ask ConfigNode for latest AINode location, update cache and retry once
3672-
final TEndPoint refreshed = refreshAINodeFromConfigNode(existingModelId);
3661+
final TEndPoint refreshed = resolveViaConfigNodeAndPublish(existingModelId);
36733662
if (refreshed == null || (ep != null && refreshed.equals(ep))) {
36743663
future.setException(first);
36753664
return future;
@@ -4699,4 +4688,4 @@ public void handlePipeConfigClientExit(final String clientId) {
46994688
LOGGER.warn("Failed to handlePipeConfigClientExit.", e);
47004689
}
47014690
}
4702-
}
4691+
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public class AINodeClient implements AutoCloseable, ThriftClient {
7777

7878
private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
7979

80-
private final TEndPoint endPoint;
80+
private TEndPoint endPoint;
8181

8282
private TTransport transport;
8383

@@ -105,6 +105,7 @@ public AINodeClient(
105105
this.property = property;
106106
this.clientManager = clientManager;
107107
this.endPoint = endPoint;
108+
LocationRegistry.initDefaultIfAbsent(endPoint);
108109
init();
109110
}
110111

@@ -123,6 +124,12 @@ private <R> R executeRemoteCallWithRetry(RemoteCall<R> call) throws TException {
123124
} catch (Exception ignore) {
124125
// ignore
125126
}
127+
// try switch to another candidate maintained by the registry
128+
final TEndPoint before = this.endPoint;
129+
final TEndPoint next = LocationRegistry.rotateOnFailure(before);
130+
if (next != null && (before == null || !before.equals(next))) {
131+
this.endPoint = next;
132+
}
126133
}
127134
}
128135
if (last != null) {
@@ -169,6 +176,75 @@ public TTransport getTransport() {
169176
return transport;
170177
}
171178

179+
/** Shared dynamic AINode location/candidates registry (no ConfigNode dependency). */
180+
private static final class LocationRegistry {
181+
private static final java.util.concurrent.atomic.AtomicReference<TEndPoint> CURRENT =
182+
new java.util.concurrent.atomic.AtomicReference<>();
183+
private static final java.util.concurrent.CopyOnWriteArrayList<TEndPoint> CANDIDATES =
184+
new java.util.concurrent.CopyOnWriteArrayList<>();
185+
private static final java.util.concurrent.locks.ReentrantLock SWITCH_LOCK =
186+
new java.util.concurrent.locks.ReentrantLock();
187+
188+
static void initDefaultIfAbsent(final TEndPoint initial) {
189+
if (initial != null && CURRENT.get() == null) {
190+
CURRENT.compareAndSet(null, initial);
191+
if (!CANDIDATES.contains(initial)) {
192+
CANDIDATES.add(initial);
193+
}
194+
}
195+
}
196+
197+
static TEndPoint getCurrent() {
198+
return CURRENT.get();
199+
}
200+
201+
static void setCurrent(final TEndPoint ep) {
202+
if (ep == null) {
203+
return;
204+
}
205+
CURRENT.set(ep);
206+
if (!CANDIDATES.contains(ep)) {
207+
CANDIDATES.add(ep);
208+
}
209+
}
210+
211+
static void replaceCandidates(final java.util.List<TEndPoint> eps) {
212+
CANDIDATES.clear();
213+
if (eps != null) {
214+
CANDIDATES.addAll(eps);
215+
}
216+
// keep CURRENT if still in list, otherwise reset to first
217+
final TEndPoint cur = CURRENT.get();
218+
if (cur == null || !CANDIDATES.contains(cur)) {
219+
if (!CANDIDATES.isEmpty()) {
220+
CURRENT.set(CANDIDATES.get(0));
221+
}
222+
}
223+
}
224+
225+
/** Move to the next available candidate (round-robin) on connection failure. */
226+
static TEndPoint rotateOnFailure(final TEndPoint failed) {
227+
if (!SWITCH_LOCK.tryLock()) {
228+
return CURRENT.get();
229+
}
230+
try {
231+
if (failed != null) {
232+
// push failed to the end to avoid immediate retry
233+
CANDIDATES.remove(failed);
234+
CANDIDATES.add(failed);
235+
}
236+
if (!CANDIDATES.isEmpty()) {
237+
final TEndPoint next = CANDIDATES.get(0);
238+
CURRENT.set(next);
239+
return next;
240+
}
241+
return CURRENT.get();
242+
} finally {
243+
SWITCH_LOCK.unlock();
244+
}
245+
}
246+
}
247+
172248
public TSStatus stopAINode() throws TException {
173249
try {
174250
TSStatus status = client.stopAINode();
@@ -242,6 +318,22 @@ public TShowAIDevicesResp showAIDevices() throws TException {
242318
return executeRemoteCallWithRetry(IAINodeRPCService.Client::showAIDevices);
243319
}
244320

321+
// ----------------------- static hooks for DataNode routing -----------------------
322+
/** Update the global/default AINode endpoint (e.g., from ConfigNode resolution). */
323+
public static void updateGlobalAINodeLocation(final TEndPoint ep) {
324+
LocationRegistry.setCurrent(ep);
325+
}
326+
327+
/** Replace the global candidate list; current will stick if still present, otherwise first. */
328+
public static void updateGlobalAINodeCandidates(final java.util.List<TEndPoint> eps) {
329+
LocationRegistry.replaceCandidates(eps);
330+
}
331+
332+
/** Get the current chosen endpoint (may be null before any initialization). */
333+
public static TEndPoint getCurrentEndpoint() {
334+
return LocationRegistry.getCurrent();
335+
}
336+
245337
public TInferenceResp inference(
246338
String modelId,
247339
TsBlock inputTsBlock,

iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,16 @@ struct TAINodeInfo {
659659
4: required i32 internalPort
660660
}
661661

662+
// ----------- New messages -----------
663+
struct TGetAINodeLocationReq {
664+
1: optional string deviceId
665+
}
666+
667+
struct TGetAINodeLocationResp {
668+
1: required common.TSStatus status
669+
2: optional common.TEndPoint aiNodeAddress
670+
}
671+
662672
struct TShowDataNodes4InformationSchemaResp {
663673
1: required common.TSStatus status
664674
2: optional list<TDataNodeInfo4InformationSchema> dataNodesInfoList
@@ -1361,6 +1371,16 @@ service IConfigNodeRPCService {
13611371

13621372
TAINodeConfigurationResp getAINodeConfiguration(i32 aiNodeId)
13631373

1374+
// ====================================================
1375+
// AI Node Location (no modelId)
1376+
// ====================================================
1377+
/**
1378+
* Return a reachable AINode location when modelId is not provided.
1379+
* If deviceId is provided, try to pick one that supports the device; otherwise
1380+
* return any healthy/default AINode.
1381+
*/
1382+
TGetAINodeLocationResp getAINodeLocation(TGetAINodeLocationReq req)
1383+
13641384
/**
13651385
* Get system configurations. i.e. configurations that is not associated with the DataNodeId
13661386
*/

0 commit comments

Comments
 (0)