diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java index e09ccc79becb..2721fedafb1e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java @@ -21,10 +21,10 @@ import org.apache.iotdb.ainode.rpc.thrift.TAIHeartbeatReq; import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; -import org.apache.iotdb.commons.client.ainode.AsyncAINodeServiceClient; import org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler; +import org.apache.iotdb.db.protocol.client.AINodeClientFactory; +import org.apache.iotdb.db.protocol.client.ainode.AsyncAINodeServiceClient; public class AsyncAINodeHeartbeatClientPool { @@ -33,8 +33,7 @@ public class AsyncAINodeHeartbeatClientPool { private AsyncAINodeHeartbeatClientPool() { clientManager = new IClientManager.Factory() - .createClientManager( - new ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory()); + .createClientManager(new AINodeClientFactory.AINodeHeartbeatClientPoolFactory()); } public void getAINodeHeartBeat( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java index 16bc16bc8725..eca00e8827d9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java @@ -19,9 +19,9 @@ package org.apache.iotdb.confignode.consensus.request.read.model; +import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq; import java.util.Objects; @@ -33,7 +33,7 @@ public ShowModelPlan() { super(ConfigPhysicalPlanType.ShowModel); } - public ShowModelPlan(final TShowModelReq showModelReq) { + public ShowModelPlan(final TShowModelsReq showModelReq) { super(ConfigPhysicalPlanType.ShowModel); if (showModelReq.isSetModelId()) { this.modelName = showModelReq.getModelId(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 502937713c2e..5d4b09adfc71 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -20,7 +20,14 @@ package org.apache.iotdb.confignode.manager; import org.apache.iotdb.ainode.rpc.thrift.IDataSchema; +import org.apache.iotdb.ainode.rpc.thrift.TLoadModelReq; +import org.apache.iotdb.ainode.rpc.thrift.TShowAIDevicesResp; +import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsReq; +import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsResp; +import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq; +import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp; import org.apache.iotdb.ainode.rpc.thrift.TTrainingReq; +import org.apache.iotdb.ainode.rpc.thrift.TUnloadModelReq; import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; @@ -42,8 +49,6 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.auth.AuthException; import org.apache.iotdb.commons.auth.entity.PrivilegeUnion; -import org.apache.iotdb.commons.client.ainode.AINodeClient; -import org.apache.iotdb.commons.client.ainode.AINodeClientManager; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.cluster.NodeType; import org.apache.iotdb.commons.conf.CommonConfig; @@ -213,7 +218,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; -import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; @@ -226,7 +230,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq; import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; @@ -235,10 +238,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodes4InformationSchemaResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; @@ -256,12 +255,13 @@ import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq; import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; -import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager; import org.apache.iotdb.db.schemaengine.template.Template; import org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType; import org.apache.iotdb.db.schemaengine.template.alter.TemplateAlterOperationUtil; @@ -2863,19 +2863,19 @@ public TSStatus unloadModel(TUnloadModelReq req) { } @Override - public TShowModelResp showModel(TShowModelReq req) { + public TShowModelsResp showModel(TShowModelsReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() ? modelManager.showModel(req) - : new TShowModelResp(status); + : new TShowModelsResp(status); } @Override - public TShowLoadedModelResp showLoadedModel(TShowLoadedModelReq req) { + public TShowLoadedModelsResp showLoadedModel(TShowLoadedModelsReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() ? modelManager.showLoadedModel(req) - : new TShowLoadedModelResp(status, Collections.emptyMap()); + : new TShowLoadedModelsResp(status, Collections.emptyMap()); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index e15c33e04b7e..33e77db24907 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -19,6 +19,13 @@ package org.apache.iotdb.confignode.manager; +import org.apache.iotdb.ainode.rpc.thrift.TLoadModelReq; +import org.apache.iotdb.ainode.rpc.thrift.TShowAIDevicesResp; +import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsReq; +import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsResp; +import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq; +import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp; +import org.apache.iotdb.ainode.rpc.thrift.TUnloadModelReq; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; @@ -128,7 +135,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; -import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferReq; @@ -140,7 +146,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq; import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; @@ -149,10 +154,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodes4InformationSchemaResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; @@ -166,7 +167,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq; -import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; import org.apache.iotdb.consensus.common.DataSet; @@ -893,10 +893,10 @@ TDataPartitionTableResp getOrCreateDataPartition( TSStatus unloadModel(TUnloadModelReq req); /** Return the model table. */ - TShowModelResp showModel(TShowModelReq req); + TShowModelsResp showModel(TShowModelsReq req); /** Return the loaded model instances. */ - TShowLoadedModelResp showLoadedModel(TShowLoadedModelReq req); + TShowLoadedModelsResp showLoadedModel(TShowLoadedModelsReq req); /** Return all available AI devices. */ TShowAIDevicesResp showAIDevices(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java index 4c1f94eab9e0..3efdbc222b6d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java @@ -19,14 +19,15 @@ package org.apache.iotdb.confignode.manager; +import org.apache.iotdb.ainode.rpc.thrift.TLoadModelReq; +import org.apache.iotdb.ainode.rpc.thrift.TShowAIDevicesResp; import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsReq; import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsResp; import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq; import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp; +import org.apache.iotdb.ainode.rpc.thrift.TUnloadModelReq; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.client.ainode.AINodeClient; -import org.apache.iotdb.commons.client.ainode.AINodeClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.model.ModelInformation; import org.apache.iotdb.commons.model.ModelStatus; @@ -40,15 +41,10 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq; import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoReq; import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoResp; -import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp; -import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -124,15 +120,16 @@ public TSStatus unloadModel(TUnloadModelReq req) { } } - public TShowModelResp showModel(final TShowModelReq req) { + public TShowModelsResp showModel(final TShowModelsReq req) { try (AINodeClient client = getAINodeClient()) { TShowModelsReq showModelsReq = new TShowModelsReq(); if (req.isSetModelId()) { showModelsReq.setModelId(req.getModelId()); } TShowModelsResp resp = client.showModels(showModelsReq); - TShowModelResp res = - new TShowModelResp().setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + TShowModelsResp res = + new TShowModelsResp() + .setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); res.setModelIdList(resp.getModelIdList()); res.setModelTypeMap(resp.getModelTypeMap()); res.setCategoryMap(resp.getCategoryMap()); @@ -140,26 +137,26 @@ public TShowModelResp showModel(final TShowModelReq req) { return res; } catch (Exception e) { LOGGER.warn("Failed to show models due to", e); - return new TShowModelResp() + return new TShowModelsResp() .setStatus( new TSStatus(TSStatusCode.AI_NODE_INTERNAL_ERROR.getStatusCode()) .setMessage(e.getMessage())); } } - public TShowLoadedModelResp showLoadedModel(final TShowLoadedModelReq req) { + public TShowLoadedModelsResp showLoadedModel(final TShowLoadedModelsReq req) { try (AINodeClient client = getAINodeClient()) { TShowLoadedModelsReq showModelsReq = new TShowLoadedModelsReq().setDeviceIdList(req.getDeviceIdList()); TShowLoadedModelsResp resp = client.showLoadedModels(showModelsReq); - TShowLoadedModelResp res = - new TShowLoadedModelResp() + TShowLoadedModelsResp res = + new TShowLoadedModelsResp() .setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); res.setDeviceLoadedModelsMap(resp.getDeviceLoadedModelsMap()); return res; } catch (Exception e) { LOGGER.warn("Failed to show loaded models due to", e); - return new TShowLoadedModelResp() + return new TShowLoadedModelsResp() .setStatus( new TSStatus(TSStatusCode.AI_NODE_INTERNAL_ERROR.getStatusCode()) .setMessage(e.getMessage())); @@ -235,7 +232,11 @@ private AINodeClient getAINodeClient() throws NoAvailableAINodeException, Client } TEndPoint targetAINodeEndPoint = new TEndPoint(aiNodeInfo.get(0).getInternalAddress(), aiNodeInfo.get(0).getInternalPort()); - return AINodeClientManager.getInstance().borrowClient(targetAINodeEndPoint); + try { + return AINodeClientManager.getInstance().borrowClient(targetAINodeEndPoint); + } catch (Exception e) { + throw new RuntimeException(e); + } } public List getModelDistributions(String modelName) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java index 8282608466d6..989061610213 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java @@ -21,8 +21,6 @@ import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.client.ainode.AINodeClient; -import org.apache.iotdb.commons.client.ainode.AINodeClientManager; import org.apache.iotdb.commons.exception.ainode.LoadModelException; import org.apache.iotdb.commons.model.ModelInformation; import org.apache.iotdb.commons.model.ModelStatus; @@ -36,6 +34,8 @@ import org.apache.iotdb.confignode.procedure.state.model.CreateModelState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.ReadWriteIOUtils; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java index 23e02ea2e1d8..daa029e04ddf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java @@ -19,10 +19,9 @@ package org.apache.iotdb.confignode.procedure.impl.model; +import org.apache.iotdb.ainode.rpc.thrift.TDeleteModelReq; import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.client.ainode.AINodeClient; -import org.apache.iotdb.commons.client.ainode.AINodeClientManager; import org.apache.iotdb.commons.model.exception.ModelManagementException; import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -30,6 +29,8 @@ import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure; import org.apache.iotdb.confignode.procedure.state.model.DropModelState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; @@ -115,7 +116,7 @@ private void dropModelOnAINode(ConfigNodeProcedureEnv env) { .getRegisteredAINode(nodeId) .getLocation() .getInternalEndPoint())) { - TSStatus status = client.deleteModel(modelName); + TSStatus status = client.deleteModel(new TDeleteModelReq(modelName)); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn( "Failed to drop model [{}] on AINode [{}], status: {}", diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java index 5f98930d0747..2cab08c28244 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java @@ -21,8 +21,6 @@ import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.client.ainode.AINodeClient; -import org.apache.iotdb.commons.client.ainode.AINodeClientManager; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan; import org.apache.iotdb.confignode.consensus.request.write.model.DropModelInNodePlan; @@ -30,6 +28,8 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.state.RemoveAINodeState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index bcc9f5068a5b..59ce7352312f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -19,6 +19,8 @@ package org.apache.iotdb.confignode.service.thrift; +import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; @@ -119,7 +121,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTableViewReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq; -import org.apache.iotdb.confignode.rpc.thrift.TCreateTrainingReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq; @@ -151,6 +152,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp; @@ -176,7 +178,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; -import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TLoginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; @@ -194,7 +195,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq; import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; @@ -203,10 +203,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodes4InformationSchemaResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; @@ -228,7 +224,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp; import org.apache.iotdb.confignode.rpc.thrift.TTestOperation; import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp; -import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq; @@ -649,6 +644,34 @@ public TSStatus operatePermission(final TAuthorizerReq req) { req.getNewUsername())); } + @Override + public TGetAINodeLocationResp getAINodeLocation() throws TException { + final TGetAINodeLocationResp resp = new TGetAINodeLocationResp(); + final TSStatus status = new TSStatus(); + try { + final List registeredAINodes = + configManager.getNodeManager().getRegisteredAINodes(); + + if (registeredAINodes == null || registeredAINodes.isEmpty()) { + status.setCode(TSStatusCode.NO_REGISTERED_AI_NODE_ERROR.getStatusCode()); + status.setMessage("No registered AINode found"); + resp.setStatus(status); + return resp; + } + + final TAINodeLocation loc = registeredAINodes.get(0).getLocation(); + resp.setAiNodeLocation(loc); + status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + status.setMessage("AINode location resolved"); + + } catch (Exception e) { + status.setCode(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + status.setMessage("getAINodeLocation failed: " + e.getMessage()); + } + resp.setStatus(status); + return resp; + } + @Override public TAuthorizerResp queryPermission(final TAuthorizerReq req) { final PermissionInfoResp dataSet = @@ -1349,31 +1372,6 @@ public TSStatus dropModel(TDropModelReq req) { return configManager.dropModel(req); } - @Override - public TSStatus loadModel(TLoadModelReq req) { - return configManager.loadModel(req); - } - - @Override - public TSStatus unloadModel(TUnloadModelReq req) { - return configManager.unloadModel(req); - } - - @Override - public TShowModelResp showModel(TShowModelReq req) { - return configManager.showModel(req); - } - - @Override - public TShowLoadedModelResp showLoadedModel(TShowLoadedModelReq req) { - return configManager.showLoadedModel(req); - } - - @Override - public TShowAIDevicesResp showAIDevices() { - return configManager.showAIDevices(); - } - @Override public TGetModelInfoResp getModelInfo(TGetModelInfoReq req) { return configManager.getModelInfo(req); @@ -1384,11 +1382,6 @@ public TSStatus updateModelInfo(TUpdateModelInfoReq req) throws TException { return configManager.updateModelInfo(req); } - @Override - public TSStatus createTraining(TCreateTrainingReq req) throws TException { - return configManager.createTraining(req); - } - @Override public TSStatus setSpaceQuota(final TSetSpaceQuotaReq req) throws TException { return configManager.setSpaceQuota(req); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/AINodeClientFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/AINodeClientFactory.java new file mode 100644 index 000000000000..0d784617c090 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/AINodeClientFactory.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.protocol.client; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ClientManagerMetrics; +import org.apache.iotdb.commons.client.IClientPoolFactory; +import org.apache.iotdb.commons.client.factory.ThriftClientFactory; +import org.apache.iotdb.commons.client.property.ClientPoolProperty; +import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; +import org.apache.iotdb.db.protocol.client.ainode.AsyncAINodeServiceClient; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.GenericKeyedObjectPool; + +import java.util.Optional; + +/** Dedicated factory for AINodeClient + AINodeClientPoolFactory. */ +public class AINodeClientFactory extends ThriftClientFactory { + + private static final int connectionTimeout = + CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS(); + + public AINodeClientFactory( + ClientManager manager, ThriftClientProperty thriftProperty) { + super(manager, thriftProperty); + } + + @Override + public PooledObject makeObject(TEndPoint endPoint) throws Exception { + return new DefaultPooledObject<>( + new AINodeClient(thriftClientProperty, endPoint, clientManager)); + } + + @Override + public void destroyObject(TEndPoint key, PooledObject pooled) throws Exception { + pooled.getObject().invalidate(); + } + + @Override + public boolean validateObject(TEndPoint key, PooledObject pooledObject) { + return Optional.ofNullable(pooledObject.getObject().getTransport()) + .map(org.apache.thrift.transport.TTransport::isOpen) + .orElse(false); + } + + /** The PoolFactory originally inside ClientPoolFactory — now moved here. */ + public static class AINodeClientPoolFactory + implements IClientPoolFactory { + + @Override + public GenericKeyedObjectPool createClientPool( + ClientManager manager) { + + // Build thrift client properties + ThriftClientProperty thriftProperty = + new ThriftClientProperty.Builder() + .setConnectionTimeoutMs(connectionTimeout) + .setRpcThriftCompressionEnabled( + CommonDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()) + .build(); + + GenericKeyedObjectPool pool = + new GenericKeyedObjectPool<>( + new AINodeClientFactory(manager, thriftProperty), + new ClientPoolProperty.Builder() + .setMaxClientNumForEachNode( + CommonDescriptor.getInstance().getConfig().getMaxClientNumForEachNode()) + .build() + .getConfig()); + + ClientManagerMetrics.getInstance() + .registerClientManager(this.getClass().getSimpleName(), pool); + + return pool; + } + } + + public static class AINodeHeartbeatClientPoolFactory + implements IClientPoolFactory { + + @Override + public GenericKeyedObjectPool createClientPool( + ClientManager manager) { + + final CommonConfig conf = CommonDescriptor.getInstance().getConfig(); + + GenericKeyedObjectPool clientPool = + new GenericKeyedObjectPool<>( + new AsyncAINodeServiceClient.Factory( + manager, + new ThriftClientProperty.Builder() + .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) + .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setPrintLogWhenEncounterException(false) + .build(), + ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()), + new ClientPoolProperty.Builder() + .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode()) + .build() + .getConfig()); + + ClientManagerMetrics.getInstance() + .registerClientManager(this.getClass().getSimpleName(), clientPool); + + return clientPool; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 6854a3191fad..2c037cf0f3e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -79,7 +79,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTableViewReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq; -import org.apache.iotdb.confignode.rpc.thrift.TCreateTrainingReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq; @@ -111,6 +110,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp; @@ -136,7 +136,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; -import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TLoginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; @@ -154,7 +153,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq; import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; @@ -163,10 +161,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodes4InformationSchemaResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; @@ -188,7 +182,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp; import org.apache.iotdb.confignode.rpc.thrift.TTestOperation; import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp; -import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq; @@ -530,6 +523,11 @@ public TAINodeRestartResp restartAINode(TAINodeRestartReq req) throws TException throw new UnsupportedOperationException(UNSUPPORTED_INVOCATION); } + @Override + public TGetAINodeLocationResp getAINodeLocation() throws TException { + return client.getAINodeLocation(); + } + @Override public TSStatus removeAINode(TAINodeRemoveReq req) throws TException { return executeRemoteCallWithRetry( @@ -1353,53 +1351,16 @@ public TSStatus dropModel(TDropModelReq req) throws TException { () -> client.dropModel(req), status -> !updateConfigNodeLeader(status)); } - @Override - public TShowModelResp showModel(TShowModelReq req) throws TException { - return executeRemoteCallWithRetry( - () -> client.showModel(req), resp -> !updateConfigNodeLeader(resp.status)); - } - - @Override - public TShowLoadedModelResp showLoadedModel(TShowLoadedModelReq req) throws TException { - return executeRemoteCallWithRetry( - () -> client.showLoadedModel(req), resp -> !updateConfigNodeLeader(resp.status)); - } - - @Override - public TShowAIDevicesResp showAIDevices() throws TException { - return executeRemoteCallWithRetry( - () -> client.showAIDevices(), resp -> !updateConfigNodeLeader(resp.status)); - } - - @Override - public TSStatus loadModel(TLoadModelReq req) throws TException { - return executeRemoteCallWithRetry( - () -> client.loadModel(req), status -> !updateConfigNodeLeader(status)); - } - - public TSStatus unloadModel(TUnloadModelReq req) throws TException { - return executeRemoteCallWithRetry( - () -> client.unloadModel(req), status -> !updateConfigNodeLeader(status)); - } - - @Override public TGetModelInfoResp getModelInfo(TGetModelInfoReq req) throws TException { return executeRemoteCallWithRetry( () -> client.getModelInfo(req), resp -> !updateConfigNodeLeader(resp.getStatus())); } - @Override public TSStatus updateModelInfo(TUpdateModelInfoReq req) throws TException { return executeRemoteCallWithRetry( () -> client.updateModelInfo(req), status -> !updateConfigNodeLeader(status)); } - @Override - public TSStatus createTraining(TCreateTrainingReq req) throws TException { - return executeRemoteCallWithRetry( - () -> client.createTraining(req), status -> !updateConfigNodeLeader(status)); - } - @Override public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException { return executeRemoteCallWithRetry( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClient.java similarity index 74% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClient.java index 0058bc7a2fc6..54150b8f3007 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClient.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.commons.client.ainode; +package org.apache.iotdb.db.protocol.client.ainode; import org.apache.iotdb.ainode.rpc.thrift.IAINodeRPCService; import org.apache.iotdb.ainode.rpc.thrift.TConfigs; @@ -37,16 +37,23 @@ import org.apache.iotdb.ainode.rpc.thrift.TTrainingReq; import org.apache.iotdb.ainode.rpc.thrift.TUnloadModelReq; import org.apache.iotdb.ainode.rpc.thrift.TWindowParams; +import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.ThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.consensus.ConfigRegionId; import org.apache.iotdb.commons.exception.ainode.LoadModelException; import org.apache.iotdb.commons.model.ModelInformation; +import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.rpc.TConfigurationConst; import org.apache.iotdb.rpc.TSStatusCode; @@ -67,6 +74,7 @@ import java.io.IOException; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.rpc.TSStatusCode.CAN_NOT_CONNECT_AINODE; import static org.apache.iotdb.rpc.TSStatusCode.INTERNAL_SERVER_ERROR; @@ -77,7 +85,7 @@ public class AINodeClient implements AutoCloseable, ThriftClient { private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); - private final TEndPoint endPoint; + private TEndPoint endPoint; private TTransport transport; @@ -86,11 +94,90 @@ public class AINodeClient implements AutoCloseable, ThriftClient { public static final String MSG_CONNECTION_FAIL = "Fail to connect to AINode. Please check status of AINode"; + private static final int MAX_RETRY = 3; + + @FunctionalInterface + private interface RemoteCall { + R apply(IAINodeRPCService.Client c) throws TException; + } private final TsBlockSerde tsBlockSerde = new TsBlockSerde(); ClientManager clientManager; + private static final IClientManager CONFIG_NODE_CLIENT_MANAGER = + ConfigNodeClientManager.getInstance(); + + private static final AtomicReference CURRENT_LOCATION = new AtomicReference<>(); + + public static TEndPoint getCurrentEndpoint() { + TAINodeLocation loc = CURRENT_LOCATION.get(); + if (loc == null) { + loc = refreshFromConfigNode(); + } + return (loc == null) ? null : pickEndpointFrom(loc); + } + + public static void updateGlobalAINodeLocation(final TAINodeLocation loc) { + if (loc != null) { + CURRENT_LOCATION.set(loc); + } + } + + private R executeRemoteCallWithRetry(RemoteCall call) throws TException { + TException last = null; + for (int attempt = 1; attempt <= MAX_RETRY; attempt++) { + try { + if (transport == null || !transport.isOpen()) { + final TEndPoint ep = getCurrentEndpoint(); + if (ep == null) { + throw new TException("AINode endpoint unavailable"); + } + this.endPoint = ep; + init(); + } + return call.apply(client); + } catch (TException e) { + last = e; + invalidate(); + final TAINodeLocation loc = refreshFromConfigNode(); + if (loc != null) { + this.endPoint = pickEndpointFrom(loc); + } + try { + Thread.sleep(1000L * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } + throw (last != null ? last : new TException(MSG_CONNECTION_FAIL)); + } + + private static TAINodeLocation refreshFromConfigNode() { + try (final ConfigNodeClient cn = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TGetAINodeLocationResp resp = cn.getAINodeLocation(); + if (resp != null && resp.isSetAiNodeLocation()) { + final TAINodeLocation loc = resp.getAiNodeLocation(); + CURRENT_LOCATION.set(loc); + return loc; + } + } catch (Exception e) { + LoggerFactory.getLogger(AINodeClient.class) + .debug("[AINodeClient] refreshFromConfigNode failed: {}", e.toString()); + } + return null; + } + + private static TEndPoint pickEndpointFrom(final TAINodeLocation loc) { + if (loc == null) return null; + if (loc.isSetInternalEndPoint() && loc.getInternalEndPoint() != null) { + return loc.getInternalEndPoint(); + } + return null; + } + public AINodeClient( ThriftClientProperty property, TEndPoint endPoint, @@ -98,6 +185,7 @@ public AINodeClient( throws TException { this.property = property; this.clientManager = clientManager; + // Instance default endpoint (pool key). Global location can override it on retries. this.endPoint = endPoint; init(); } @@ -188,76 +276,28 @@ private ModelInformation parseModelInformation( modelName, inputShape, outputShape, inputType, outputType, attributes); } - public TSStatus deleteModel(String modelId) throws TException { - try { - return client.deleteModel(new TDeleteModelReq(modelId)); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + public TSStatus deleteModel(TDeleteModelReq req) throws TException { + return executeRemoteCallWithRetry(c -> c.deleteModel(req)); } public TSStatus loadModel(TLoadModelReq req) throws TException { - try { - return client.loadModel(req); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(c -> c.loadModel(req)); } public TSStatus unloadModel(TUnloadModelReq req) throws TException { - try { - return client.unloadModel(req); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(c -> c.unloadModel(req)); } public TShowModelsResp showModels(TShowModelsReq req) throws TException { - try { - return client.showModels(req); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(c -> c.showModels(req)); } public TShowLoadedModelsResp showLoadedModels(TShowLoadedModelsReq req) throws TException { - try { - return client.showLoadedModels(req); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(c -> c.showLoadedModels(req)); } public TShowAIDevicesResp showAIDevices() throws TException { - try { - return client.showAIDevices(); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(IAINodeRPCService.Client::showAIDevices); } public TInferenceResp inference( @@ -274,7 +314,7 @@ public TInferenceResp inference( if (inferenceAttributes != null) { inferenceReq.setInferenceAttributes(inferenceAttributes); } - return client.inference(inferenceReq); + return executeRemoteCallWithRetry(c -> c.inference(inferenceReq)); } catch (IOException e) { throw new TException("An exception occurred while serializing input data", e); } catch (TException e) { @@ -292,7 +332,7 @@ public TForecastResp forecast( TForecastReq forecastReq = new TForecastReq(modelId, tsBlockSerde.serialize(inputTsBlock), outputLength); forecastReq.setOptions(options); - return client.forecast(forecastReq); + return executeRemoteCallWithRetry(c -> c.forecast(forecastReq)); } catch (IOException e) { TSStatus tsStatus = new TSStatus(INTERNAL_SERVER_ERROR.getStatusCode()); tsStatus.setMessage(String.format("Failed to serialize input tsblock %s", e.getMessage())); @@ -308,15 +348,7 @@ public TForecastResp forecast( } public TSStatus createTrainingTask(TTrainingReq req) throws TException { - try { - return client.createTrainingTask(req); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(c -> c.createTrainingTask(req)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClientManager.java new file mode 100644 index 000000000000..faef1c1ae7b6 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClientManager.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.protocol.client.ainode; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.db.protocol.client.AINodeClientFactory; + +public class AINodeClientManager { + + public static final int DEFAULT_AINODE_ID = 0; + + private static final AINodeClientManager INSTANCE = new AINodeClientManager(); + + private final IClientManager clientManager; + + private volatile TEndPoint defaultAINodeEndPoint; + + private AINodeClientManager() { + this.clientManager = + new IClientManager.Factory() + .createClientManager(new AINodeClientFactory.AINodeClientPoolFactory()); + } + + public static AINodeClientManager getInstance() { + return INSTANCE; + } + + public void updateDefaultAINodeLocation(TEndPoint endPoint) { + this.defaultAINodeEndPoint = endPoint; + } + + public AINodeClient borrowClient(TEndPoint endPoint) throws Exception { + return clientManager.borrowClient(endPoint); + } + + public AINodeClient borrowClient(int aiNodeId) throws Exception { + if (aiNodeId != DEFAULT_AINODE_ID) { + throw new IllegalArgumentException("Unsupported AINodeId: " + aiNodeId); + } + if (defaultAINodeEndPoint == null) { + defaultAINodeEndPoint = AINodeClient.getCurrentEndpoint(); + } + return clientManager.borrowClient(defaultAINodeEndPoint); + } + + public void clear(TEndPoint endPoint) { + clientManager.clear(endPoint); + } + + public void clearAll() { + clientManager.close(); + } + + public IClientManager getRawClientManager() { + return clientManager; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AsyncAINodeServiceClient.java similarity index 98% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AsyncAINodeServiceClient.java index ba0b1d11e70c..26130287697c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AsyncAINodeServiceClient.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.commons.client.ainode; +package org.apache.iotdb.db.protocol.client.ainode; import org.apache.iotdb.ainode.rpc.thrift.IAINodeRPCService; import org.apache.iotdb.common.rpc.thrift.TEndPoint; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java index ccdef60aaf0b..7126af78b8b5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java @@ -21,9 +21,9 @@ import org.apache.iotdb.ainode.rpc.thrift.TInferenceResp; import org.apache.iotdb.ainode.rpc.thrift.TWindowParams; -import org.apache.iotdb.commons.client.ainode.AINodeClient; -import org.apache.iotdb.commons.client.ainode.AINodeClientManager; import org.apache.iotdb.db.exception.runtime.ModelInferenceProcessException; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index db36048fcd05..51c8c72e8946 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -19,11 +19,14 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; +import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq; +import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp; import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; @@ -51,8 +54,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; @@ -66,6 +67,8 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager; import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; @@ -789,18 +792,18 @@ private static class ModelsSupplier extends TsBlockSupplier { private ModelsSupplier(final List dataTypes) throws Exception { super(dataTypes); - try (final ConfigNodeClient client = - ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - iterator = new ModelIterator(client.showModel(new TShowModelReq())); + final TEndPoint ep = AINodeClient.getCurrentEndpoint(); + try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(ep)) { + iterator = new ModelIterator(ai.showModels(new TShowModelsReq())); } } private static class ModelIterator implements Iterator { private int index = 0; - private final TShowModelResp resp; + private final TShowModelsResp resp; - private ModelIterator(TShowModelResp resp) { + private ModelIterator(TShowModelsResp resp) { this.resp = resp; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index aab631825e32..b617bac5f546 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -19,6 +19,14 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.executor; +import org.apache.iotdb.ainode.rpc.thrift.TLoadModelReq; +import org.apache.iotdb.ainode.rpc.thrift.TShowAIDevicesResp; +import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsReq; +import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsResp; +import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq; +import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp; +import org.apache.iotdb.ainode.rpc.thrift.TTrainingReq; +import org.apache.iotdb.ainode.rpc.thrift.TUnloadModelReq; import org.apache.iotdb.common.rpc.thrift.FunctionType; import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; @@ -93,12 +101,9 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTableViewReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq; -import org.apache.iotdb.confignode.rpc.thrift.TCreateTrainingReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp; -import org.apache.iotdb.confignode.rpc.thrift.TDataSchemaForTable; -import org.apache.iotdb.confignode.rpc.thrift.TDataSchemaForTree; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq; @@ -129,24 +134,18 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; -import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferReq; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferResp; import org.apache.iotdb.confignode.rpc.thrift.TReconstructRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; import org.apache.iotdb.confignode.rpc.thrift.TRemoveRegionReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; @@ -164,7 +163,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp; -import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.BatchProcessException; @@ -177,6 +175,8 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.protocol.client.DataNodeClientPoolFactory; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager; import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -3606,20 +3606,19 @@ public SettableFuture dropModel(final String modelId) { @Override public SettableFuture showModels(final String modelId) { final SettableFuture future = SettableFuture.create(); - try (final ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TShowModelReq req = new TShowModelReq(); + try (final AINodeClient ai = + AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) { + final TShowModelsReq req = new TShowModelsReq(); if (modelId != null) { req.setModelId(modelId); } - final TShowModelResp showModelResp = client.showModel(req); - if (showModelResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - future.setException(new IoTDBException(showModelResp.getStatus())); + final TShowModelsResp resp = ai.showModels(req); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException(new IoTDBException(resp.getStatus())); return future; } - // convert model info list and buildTsBlock - ShowModelsTask.buildTsBlock(showModelResp, future); - } catch (final ClientManagerException | TException e) { + ShowModelsTask.buildTsBlock(resp, future); + } catch (final Exception e) { future.setException(e); } return future; @@ -3628,21 +3627,17 @@ public SettableFuture showModels(final String modelId) { @Override public SettableFuture showLoadedModels(List deviceIdList) { final SettableFuture future = SettableFuture.create(); - try (final ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TShowLoadedModelReq req = new TShowLoadedModelReq(); - if (deviceIdList != null) { - req.setDeviceIdList(deviceIdList); - } else { - req.setDeviceIdList(new ArrayList<>()); - } - final TShowLoadedModelResp resp = client.showLoadedModel(req); + try (final AINodeClient ai = + AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) { + final TShowLoadedModelsReq req = new TShowLoadedModelsReq(); + req.setDeviceIdList(deviceIdList != null ? deviceIdList : new ArrayList<>()); + final TShowLoadedModelsResp resp = ai.showLoadedModels(req); if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { future.setException(new IoTDBException(resp.getStatus())); return future; } ShowLoadedModelsTask.buildTsBlock(resp, future); - } catch (final ClientManagerException | TException e) { + } catch (final Exception e) { future.setException(e); } return future; @@ -3651,15 +3646,15 @@ public SettableFuture showLoadedModels(List deviceIdLi @Override public SettableFuture showAIDevices() { final SettableFuture future = SettableFuture.create(); - try (final ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TShowAIDevicesResp resp = client.showAIDevices(); + try (final AINodeClient ai = + AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) { + final TShowAIDevicesResp resp = ai.showAIDevices(); if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { future.setException(new IoTDBException(resp.getStatus())); return future; } ShowAIDevicesTask.buildTsBlock(resp, future); - } catch (final ClientManagerException | TException e) { + } catch (final Exception e) { future.setException(e); } return future; @@ -3669,16 +3664,16 @@ public SettableFuture showAIDevices() { public SettableFuture loadModel( String existingModelId, List deviceIdList) { final SettableFuture future = SettableFuture.create(); - try (final ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + try (final AINodeClient ai = + AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) { final TLoadModelReq req = new TLoadModelReq(existingModelId, deviceIdList); - final TSStatus result = client.loadModel(req); + final TSStatus result = ai.loadModel(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) { future.setException(new IoTDBException(result)); } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (final ClientManagerException | TException e) { + } catch (final Exception e) { future.setException(e); } return future; @@ -3688,16 +3683,16 @@ public SettableFuture loadModel( public SettableFuture unloadModel( String existingModelId, List deviceIdList) { final SettableFuture future = SettableFuture.create(); - try (final ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + try (final AINodeClient ai = + AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) { final TUnloadModelReq req = new TUnloadModelReq(existingModelId, deviceIdList); - final TSStatus result = client.unloadModel(req); + final TSStatus result = ai.unloadModel(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) { future.setException(new IoTDBException(result)); } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (final ClientManagerException | TException e) { + } catch (final Exception e) { future.setException(e); } return future; @@ -3713,28 +3708,24 @@ public SettableFuture createTraining( @Nullable String targetSql, @Nullable List pathList) { final SettableFuture future = SettableFuture.create(); - try (final ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TCreateTrainingReq req = new TCreateTrainingReq(modelId, isTableModel, existingModelId); - - if (isTableModel) { - TDataSchemaForTable dataSchemaForTable = new TDataSchemaForTable(); - dataSchemaForTable.setTargetSql(targetSql); - req.setDataSchemaForTable(dataSchemaForTable); - } else { - TDataSchemaForTree dataSchemaForTree = new TDataSchemaForTree(); - dataSchemaForTree.setPath(pathList); - req.setDataSchemaForTree(dataSchemaForTree); - } + try (final AINodeClient ai = + AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) { + final TTrainingReq req = new TTrainingReq(); + req.setModelId(modelId); req.setParameters(parameters); - req.setTimeRanges(timeRanges); - final TSStatus executionStatus = client.createTraining(req); - if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { - future.setException(new IoTDBException(executionStatus)); + if (existingModelId != null) { + req.setExistingModelId(existingModelId); + } + if (existingModelId != null) { + req.setExistingModelId(existingModelId); + } + final TSStatus status = ai.createTrainingTask(req); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) { + future.setException(new IoTDBException(status)); } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (final ClientManagerException | TException e) { + } catch (final Exception e) { future.setException(e); } return future; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowAIDevicesTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowAIDevicesTask.java index d2cdd9675951..690f6f9485f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowAIDevicesTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowAIDevicesTask.java @@ -19,9 +19,9 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ai; +import org.apache.iotdb.ainode.rpc.thrift.TShowAIDevicesResp; import org.apache.iotdb.commons.schema.column.ColumnHeader; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; -import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowLoadedModelsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowLoadedModelsTask.java index 5beb8a6a5aa6..c8c6f8938f75 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowLoadedModelsTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowLoadedModelsTask.java @@ -19,9 +19,9 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ai; +import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsResp; import org.apache.iotdb.commons.schema.column.ColumnHeader; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; -import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; @@ -54,7 +54,7 @@ public ListenableFuture execute(IConfigTaskExecutor configTask } public static void buildTsBlock( - TShowLoadedModelResp resp, SettableFuture future) { + TShowLoadedModelsResp resp, SettableFuture future) { List outputDataTypes = ColumnHeaderConstant.showLoadedModelsColumnHeaders.stream() .map(ColumnHeader::getColumnType) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowModelsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowModelsTask.java index 73c5b1b6f7dc..c0d7f4ef2030 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowModelsTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowModelsTask.java @@ -19,9 +19,9 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ai; +import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp; import org.apache.iotdb.commons.schema.column.ColumnHeader; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; -import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; @@ -59,7 +59,7 @@ public ListenableFuture execute(IConfigTaskExecutor configTask return configTaskExecutor.showModels(modelName); } - public static void buildTsBlock(TShowModelResp resp, SettableFuture future) { + public static void buildTsBlock(TShowModelsResp resp, SettableFuture future) { List modelIdList = resp.getModelIdList(); Map modelTypeMap = resp.getModelTypeMap(); Map categoryMap = resp.getCategoryMap(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java index 5521062a24e1..887d7c26d305 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java @@ -21,11 +21,10 @@ import org.apache.iotdb.ainode.rpc.thrift.TForecastResp; import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.commons.client.IClientManager; -import org.apache.iotdb.commons.client.ainode.AINodeClient; -import org.apache.iotdb.commons.client.ainode.AINodeClientManager; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager; import org.apache.iotdb.db.queryengine.plan.analyze.IModelFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.model.ModelInferenceDescriptor; import org.apache.iotdb.rpc.TSStatusCode; @@ -457,8 +456,7 @@ private static Map parseOptions(String options) { private static class ForecastDataProcessor implements TableFunctionDataProcessor { private static final TsBlockSerde SERDE = new TsBlockSerde(); - private static final IClientManager CLIENT_MANAGER = - AINodeClientManager.getInstance(); + private static final AINodeClientManager CLIENT_MANAGER = AINodeClientManager.getInstance(); private final TEndPoint targetAINode; private final String modelId; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java index 22c2bce7b5ee..e77e0641ae97 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java @@ -21,10 +21,9 @@ import org.apache.iotdb.ainode.rpc.thrift.TForecastResp; import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.commons.client.IClientManager; -import org.apache.iotdb.commons.client.ainode.AINodeClient; -import org.apache.iotdb.commons.client.ainode.AINodeClientManager; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; +import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager; import org.apache.iotdb.db.queryengine.plan.analyze.IModelFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.ModelFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.model.ModelInferenceDescriptor; @@ -55,8 +54,7 @@ public class UDTFForecast implements UDTF { private static final TsBlockSerde serde = new TsBlockSerde(); - private static final IClientManager CLIENT_MANAGER = - AINodeClientManager.getInstance(); + private static final AINodeClientManager CLIENT_MANAGER = AINodeClientManager.getInstance(); private TEndPoint targetAINode = new TEndPoint("127.0.0.1", 10810); private String model_id; private int maxInputLength; diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml index 9e74e999fc47..4a42198f5b57 100644 --- a/iotdb-core/node-commons/pom.xml +++ b/iotdb-core/node-commons/pom.xml @@ -179,12 +179,6 @@ com.timecho.ratis ratis-common - - org.apache.iotdb - iotdb-thrift-ainode - 2.0.6-SNAPSHOT - compile - diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index 32c6345dc27e..106d67b6279d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -20,8 +20,6 @@ package org.apache.iotdb.commons.client; import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.commons.client.ainode.AINodeClient; -import org.apache.iotdb.commons.client.ainode.AsyncAINodeServiceClient; import org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient; import org.apache.iotdb.commons.client.async.AsyncDataNodeExternalServiceClient; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; @@ -325,56 +323,6 @@ public GenericKeyedObjectPool cre } } - public static class AsyncAINodeHeartbeatServiceClientPoolFactory - implements IClientPoolFactory { - @Override - public GenericKeyedObjectPool createClientPool( - ClientManager manager) { - GenericKeyedObjectPool clientPool = - new GenericKeyedObjectPool<>( - new AsyncAINodeServiceClient.Factory( - manager, - new ThriftClientProperty.Builder() - .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) - .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) - .setPrintLogWhenEncounterException(false) - .build(), - ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()), - new ClientPoolProperty.Builder() - .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode()) - .build() - .getConfig()); - ClientManagerMetrics.getInstance() - .registerClientManager(this.getClass().getSimpleName(), clientPool); - return clientPool; - } - } - - public static class AINodeClientPoolFactory - implements IClientPoolFactory { - - @Override - public GenericKeyedObjectPool createClientPool( - ClientManager manager) { - GenericKeyedObjectPool clientPool = - new GenericKeyedObjectPool<>( - new AINodeClient.Factory( - manager, - new ThriftClientProperty.Builder() - .setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS()) - .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .build()), - new ClientPoolProperty.Builder() - .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode()) - .build() - .getConfig()); - ClientManagerMetrics.getInstance() - .registerClientManager(this.getClass().getSimpleName(), clientPool); - return clientPool; - } - } - public static class SyncPipeConsensusServiceClientPoolFactory implements IClientPoolFactory { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClientManager.java deleted file mode 100644 index 3a06e478e7b5..000000000000 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClientManager.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.commons.client.ainode; - -import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.commons.client.ClientPoolFactory; -import org.apache.iotdb.commons.client.IClientManager; - -public class AINodeClientManager { - private AINodeClientManager() { - // Empty constructor - } - - private static final class AINodeClientManagerHolder { - private static final IClientManager INSTANCE = - new IClientManager.Factory() - .createClientManager(new ClientPoolFactory.AINodeClientPoolFactory()); - } - - public static IClientManager getInstance() { - return AINodeClientManagerHolder.INSTANCE; - } -} diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 3ee3ca89bdcc..d8f6318063eb 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -659,6 +659,13 @@ struct TAINodeInfo { 4: required i32 internalPort } +// ----------- New messages ----------- + +struct TGetAINodeLocationResp { + 1: required common.TSStatus status + 2: optional common.TAINodeLocation aiNodeLocation +} + struct TShowDataNodes4InformationSchemaResp { 1: required common.TSStatus status 2: optional list dataNodesInfoList @@ -1098,42 +1105,6 @@ struct TDropModelReq { 1: required string modelId } -struct TShowModelReq { - 1: optional string modelId -} - -struct TShowModelResp { - 1: required common.TSStatus status - 2: optional list modelIdList - 3: optional map modelTypeMap - 4: optional map categoryMap - 5: optional map stateMap -} - -struct TShowLoadedModelReq { - 1: required list deviceIdList -} - -struct TShowLoadedModelResp { - 1: required common.TSStatus status - 2: required map> deviceLoadedModelsMap -} - -struct TShowAIDevicesResp { - 1: required common.TSStatus status - 2: required list deviceIdList -} - -struct TLoadModelReq { - 1: required string existingModelId - 2: required list deviceIdList -} - -struct TUnloadModelReq { - 1: required string modelId - 2: required list deviceIdList -} - struct TGetModelInfoReq { 1: required string modelId } @@ -1371,6 +1342,11 @@ service IConfigNodeRPCService { TAINodeConfigurationResp getAINodeConfiguration(i32 aiNodeId) + /** + * Return a reachable AINode location. + */ + TGetAINodeLocationResp getAINodeLocation() + /** * Get system configurations. i.e. configurations that is not associated with the DataNodeId */ @@ -2049,43 +2025,12 @@ service IConfigNodeRPCService { common.TSStatus dropModel(TDropModelReq req) /** - * Return the model table - */ - TShowModelResp showModel(TShowModelReq req) - - /** - * Return the loaded model table - */ - TShowLoadedModelResp showLoadedModel(TShowLoadedModelReq req) - - /** - * Return the available ai devices - */ - TShowAIDevicesResp showAIDevices() - - /** - * Load an existing model to specific devices - * - * @return SUCCESS_STATUS if the model loading task was submitted successfully - */ - common.TSStatus loadModel(TLoadModelReq req) - - /** - * Unload an existing model to specific devices - * - * @return SUCCESS_STATUS if the model unloading task was submitted successfully - */ - common.TSStatus unloadModel(TUnloadModelReq req) - - /** - * Return the model info by model_id - */ + * Return the model info by model_id + */ TGetModelInfoResp getModelInfo(TGetModelInfoReq req) common.TSStatus updateModelInfo(TUpdateModelInfoReq req) - common.TSStatus createTraining(TCreateTrainingReq req) - // ====================================================== // Quota // ======================================================