Skip to content

Commit 8505028

Browse files
[AINode] Move loadmodel() method to AINodeClient
1 parent 5d51b1b commit 8505028

File tree

6 files changed

+166
-213
lines changed

6 files changed

+166
-213
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.confignode.procedure.impl.model;
2121

22+
import org.apache.iotdb.ainode.rpc.thrift.TDeleteModelReq;
2223
import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
2324
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2425
import org.apache.iotdb.commons.client.ainode.AINodeClient;
@@ -115,7 +116,7 @@ private void dropModelOnAINode(ConfigNodeProcedureEnv env) {
115116
.getRegisteredAINode(nodeId)
116117
.getLocation()
117118
.getInternalEndPoint())) {
118-
TSStatus status = client.deleteModel(modelName);
119+
TSStatus status = client.deleteModel(new TDeleteModelReq(modelName));
119120
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
120121
LOGGER.warn(
121122
"Failed to drop model [{}] on AINode [{}], status: {}",

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java

Lines changed: 24 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919

2020
package org.apache.iotdb.confignode.service.thrift;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
23+
import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
2224
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
2325
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
24-
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2526
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
2627
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
2728
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
@@ -151,7 +152,6 @@
151152
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
152153
import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
153154
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
154-
import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationReq;
155155
import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp;
156156
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
157157
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
@@ -178,7 +178,6 @@
178178
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
179179
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
180180
import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq;
181-
import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq;
182181
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
183182
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
184183
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -652,24 +651,37 @@ public TSStatus operatePermission(final TAuthorizerReq req) {
652651
}
653652

654653
@Override
655-
public TGetAINodeLocationResp getAINodeLocation(final TGetAINodeLocationReq req)
656-
throws TException {
654+
public TGetAINodeLocationResp getAINodeLocation() throws TException {
657655
final TGetAINodeLocationResp resp = new TGetAINodeLocationResp();
658656
final TSStatus status = new TSStatus();
659657
try {
660-
final java.util.List<?> registered = configManager.getNodeManager().getRegisteredAINodes();
661-
if (registered == null || registered.isEmpty()) {
662-
status.setCode(TSStatusCode.AI_NODE_INTERNAL_ERROR.getStatusCode());
658+
final List<TAINodeConfiguration> registeredAINodes =
659+
configManager.getNodeManager().getRegisteredAINodes();
660+
661+
if (registeredAINodes == null || registeredAINodes.isEmpty()) {
662+
status.setCode(TSStatusCode.NO_REGISTERED_AI_NODE_ERROR.getStatusCode());
663663
status.setMessage("No registered AINode found");
664664
resp.setStatus(status);
665665
return resp;
666666
}
667-
final java.util.Optional<TEndPoint> picked = pickAnyEndPointFromRegistered(registered);
668-
if (picked.isPresent()) {
669-
resp.setAiNodeAddress(picked.get());
667+
668+
final TAINodeConfiguration cfg = registeredAINodes.get(0);
669+
final TAINodeLocation loc = (cfg == null) ? null : cfg.getLocation();
670+
671+
boolean hasEndpoint = false;
672+
if (loc != null) {
673+
try {
674+
hasEndpoint = (loc.isSetInternalEndPoint() && loc.getInternalEndPoint() != null);
675+
} catch (Throwable ignore) {
676+
}
677+
}
678+
679+
if (loc != null && hasEndpoint) {
680+
resp.setAiNodeLocation(loc);
670681
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
682+
status.setMessage("AINode location resolved");
671683
} else {
672-
status.setCode(TSStatusCode.AI_NODE_INTERNAL_ERROR.getStatusCode());
684+
status.setCode(TSStatusCode.NO_REGISTERED_AI_NODE_ERROR.getStatusCode());
673685
status.setMessage("No valid AINode endpoint extracted from registry");
674686
}
675687
} catch (Exception e) {
@@ -680,30 +692,6 @@ public TGetAINodeLocationResp getAINodeLocation(final TGetAINodeLocationReq req)
680692
return resp;
681693
}
682694

683-
private java.util.Optional<TEndPoint> pickAnyEndPointFromRegistered(
684-
final java.util.List<?> registered) {
685-
for (Object v : registered) {
686-
if (v instanceof TEndPoint) {
687-
return java.util.Optional.of((TEndPoint) v);
688-
}
689-
try {
690-
final Object ep = v.getClass().getMethod("getRpcEndPoint").invoke(v);
691-
if (ep instanceof TEndPoint) {
692-
return java.util.Optional.of((TEndPoint) ep);
693-
}
694-
} catch (ReflectiveOperationException ignore) {
695-
}
696-
try {
697-
final Object ep = v.getClass().getMethod("getClientRpcEndPoint").invoke(v);
698-
if (ep instanceof TEndPoint) {
699-
return java.util.Optional.of((TEndPoint) ep);
700-
}
701-
} catch (ReflectiveOperationException ignore) {
702-
}
703-
}
704-
return java.util.Optional.empty();
705-
}
706-
707695
@Override
708696
public TAuthorizerResp queryPermission(final TAuthorizerReq req) {
709697
final PermissionInfoResp dataSet =
@@ -1399,11 +1387,6 @@ public TSStatus dropModel(TDropModelReq req) {
13991387
return configManager.dropModel(req);
14001388
}
14011389

1402-
@Override
1403-
public TSStatus loadModel(TLoadModelReq req) {
1404-
return configManager.loadModel(req);
1405-
}
1406-
14071390
@Override
14081391
public TSStatus unloadModel(TUnloadModelReq req) {
14091392
return configManager.unloadModel(req);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@
110110
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
111111
import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
112112
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
113-
import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationReq;
114113
import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp;
115114
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
116115
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
@@ -137,7 +136,6 @@
137136
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
138137
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
139138
import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq;
140-
import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq;
141139
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
142140
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
143141
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -531,9 +529,9 @@ public TAINodeRestartResp restartAINode(TAINodeRestartReq req) throws TException
531529
throw new UnsupportedOperationException(UNSUPPORTED_INVOCATION);
532530
}
533531

534-
public TGetAINodeLocationResp getAINodeLocation(final TGetAINodeLocationReq req)
535-
throws org.apache.thrift.TException {
536-
return client.getAINodeLocation(req);
532+
@Override
533+
public TGetAINodeLocationResp getAINodeLocation() throws TException {
534+
return client.getAINodeLocation();
537535
}
538536

539537
@Override
@@ -1372,11 +1370,6 @@ public TShowAIDevicesResp showAIDevices() throws TException {
13721370
}
13731371

13741372
@Override
1375-
public TSStatus loadModel(TLoadModelReq req) throws TException {
1376-
return executeRemoteCallWithRetry(
1377-
() -> client.loadModel(req), status -> !updateConfigNodeLeader(status));
1378-
}
1379-
13801373
public TSStatus unloadModel(TUnloadModelReq req) throws TException {
13811374
return executeRemoteCallWithRetry(
13821375
() -> client.unloadModel(req), status -> !updateConfigNodeLeader(status));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 86 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.ainode.rpc.thrift.TLoadModelReq;
2323
import org.apache.iotdb.common.rpc.thrift.FunctionType;
2424
import org.apache.iotdb.common.rpc.thrift.Model;
25+
import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
2526
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
2627
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
2728
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -119,10 +120,9 @@
119120
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
120121
import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
121122
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
123+
import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp;
122124
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
123125
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
124-
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoReq;
125-
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoResp;
126126
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
127127
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
128128
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
@@ -379,37 +379,6 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
379379
// NOTE: AINode location is now maintained globally inside AINodeClient.
380380
// We only resolve via ConfigNode when needed, then publish it back to AINodeClient.
381381

382-
/** Ask ConfigNode for the latest AINode location (precise by modelId when available). */
383-
private TEndPoint resolveViaConfigNodeAndPublish(String modelIdOrNull) {
384-
try (ConfigNodeClient cn =
385-
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
386-
TEndPoint ep = null;
387-
if (modelIdOrNull != null && !modelIdOrNull.isEmpty()) {
388-
final TGetModelInfoResp resp = cn.getModelInfo(new TGetModelInfoReq(modelIdOrNull));
389-
if (resp != null && resp.isSetAiNodeAddress()) {
390-
ep = resp.getAiNodeAddress();
391-
}
392-
} else {
393-
final org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp r =
394-
cn.getAINodeLocation(
395-
new org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationReq());
396-
if (r != null
397-
&& r.getStatus() != null
398-
&& r.getStatus().getCode()
399-
== org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS.getStatusCode()
400-
&& r.isSetAiNodeAddress()) {
401-
ep = r.getAiNodeAddress();
402-
}
403-
}
404-
if (ep != null) {
405-
AINodeClient.updateGlobalAINodeLocation(ep);
406-
}
407-
return ep;
408-
} catch (Exception e) {
409-
return null;
410-
}
411-
}
412-
413382
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigTaskExecutor.class);
414383

415384
private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER =
@@ -3646,38 +3615,106 @@ public SettableFuture<ConfigTaskResult> showAIDevices() {
36463615
public SettableFuture<ConfigTaskResult> loadModel(
36473616
String existingModelId, List<String> deviceIdList) {
36483617
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
3649-
// 1) Try direct DataNode → AINode with cached endpoint
3618+
final long t0 = System.currentTimeMillis();
3619+
LOGGER.info("[LoadModel] begin: modelId={}, devices={}", existingModelId, deviceIdList);
36503620
TEndPoint ep = AINodeClient.getCurrentEndpoint();
3621+
LOGGER.debug("[LoadModel] currentEndpoint(beforeResolve)={}", ep);
3622+
3623+
if (ep == null) {
3624+
ep = resolveAINodeEndpointOrNullWithLog("[LoadModel] initial-resolve");
3625+
LOGGER.debug("[LoadModel] endpoint(after initial resolve)={}", ep);
3626+
}
3627+
36513628
try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(ep)) {
3629+
LOGGER.info("[LoadModel] borrowClient OK: endpoint={}", ep);
3630+
36523631
final TLoadModelReq req = new TLoadModelReq(existingModelId, deviceIdList);
36533632
final TSStatus result = ai.loadModel(req);
3633+
LOGGER.info(
3634+
"[LoadModel] RPC done: statusCode={}, message={}", result.getCode(), result.getMessage());
3635+
36543636
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
3655-
future.setException(new IoTDBException(result));
3637+
final IoTDBException ex = new IoTDBException(result);
3638+
LOGGER.warn("[LoadModel] RPC not success: {}", ex.getMessage());
3639+
future.setException(ex);
36563640
} else {
36573641
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
36583642
}
36593643
} catch (final Exception first) {
3660-
// 2) Fallback: ask ConfigNode for latest AINode location, update cache and retry once
3661-
final TEndPoint refreshed = resolveViaConfigNodeAndPublish(existingModelId);
3662-
if (refreshed == null || (ep != null && refreshed.equals(ep))) {
3663-
future.setException(first);
3664-
return future;
3665-
}
3666-
try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(refreshed)) {
3667-
final TLoadModelReq req = new TLoadModelReq(existingModelId, deviceIdList);
3668-
final TSStatus result = ai.loadModel(req);
3669-
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
3670-
future.setException(new IoTDBException(result));
3644+
final org.apache.iotdb.common.rpc.thrift.TAINodeLocation refreshedLocation;
3645+
try (final ConfigNodeClient cn =
3646+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
3647+
final org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp r =
3648+
cn.getAINodeLocation();
3649+
final boolean hasLoc = (r != null && r.isSetAiNodeLocation());
3650+
3651+
if (hasLoc) {
3652+
refreshedLocation = r.getAiNodeLocation();
3653+
debugDumpLocation("[LoadModel] refreshed-location", refreshedLocation);
3654+
AINodeClient.updateGlobalAINodeLocation(refreshedLocation);
3655+
3656+
final TEndPoint epRefreshed = pickEndpointFrom(refreshedLocation);
36713657
} else {
3672-
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
3658+
future.setException(first);
3659+
return future;
36733660
}
3674-
} catch (final Exception second) {
3675-
future.setException(second);
3661+
} catch (Exception e2) {
3662+
future.setException(first);
3663+
return future;
36763664
}
3665+
future.setException(first);
36773666
}
3667+
36783668
return future;
36793669
}
36803670

3671+
private TEndPoint resolveAINodeEndpointOrNullWithLog(final String tag) {
3672+
try (final ConfigNodeClient cn =
3673+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
3674+
final TGetAINodeLocationResp resp = cn.getAINodeLocation();
3675+
final boolean ok = (resp != null && resp.isSetAiNodeLocation());
3676+
if (!ok) {
3677+
return null;
3678+
}
3679+
final TAINodeLocation loc = resp.getAiNodeLocation();
3680+
debugDumpLocation(tag + " aiNodeLocation", loc);
3681+
3682+
final TEndPoint picked = pickEndpointFrom(loc);
3683+
AINodeClient.updateGlobalAINodeLocation(loc);
3684+
3685+
return picked;
3686+
} catch (Exception e) {
3687+
return null;
3688+
}
3689+
}
3690+
3691+
private static TEndPoint pickEndpointFrom(final TAINodeLocation loc) {
3692+
if (loc == null) return null;
3693+
try {
3694+
if (loc.isSetInternalEndPoint() && loc.getInternalEndPoint() != null) {
3695+
return loc.getInternalEndPoint();
3696+
}
3697+
} catch (Throwable ignore) {
3698+
}
3699+
return null;
3700+
}
3701+
3702+
private static void debugDumpLocation(final String tag, final TAINodeLocation loc) {
3703+
if (loc == null) {
3704+
LOGGER.debug("{}: location=null", tag);
3705+
return;
3706+
}
3707+
StringBuilder sb = new StringBuilder(128);
3708+
sb.append(tag).append(": ");
3709+
try {
3710+
sb.append("internal=")
3711+
.append(loc.isSetInternalEndPoint() ? loc.getInternalEndPoint() : "null")
3712+
.append("; ");
3713+
} catch (Throwable ignore) {
3714+
}
3715+
LOGGER.debug(sb.toString());
3716+
}
3717+
36813718
@Override
36823719
public SettableFuture<ConfigTaskResult> unloadModel(
36833720
String existingModelId, List<String> deviceIdList) {

0 commit comments

Comments
 (0)