@@ -30,7 +30,7 @@ static int32_t tagFromRequestId(LlmRequest::RequestIdType requestId)
30
30
return ((requestId & 0xFFF ) << 8 ) | (kDATA_TAG & 0xFF );
31
31
}
32
32
33
- DataSenderImpl::DataSenderImpl (executor::kv_cache::ConnectionManager* manager,
33
+ CacheSenderImpl::CacheSenderImpl (executor::kv_cache::ConnectionManager* manager,
34
34
executor::kv_cache::CacheState selfCacheState, SizeType32 selfIndex, std::unique_ptr<BaseCacheFormatter> formatter)
35
35
: mManager {manager}
36
36
, mSelfState {std::move (selfCacheState), executor::kv_cache::CommState{manager->getCommState ()}}
@@ -41,7 +41,7 @@ DataSenderImpl::DataSenderImpl(executor::kv_cache::ConnectionManager* manager,
41
41
TLLM_CHECK (mManager ->getCommState ().getSelfIdx () == selfIndex);
42
42
}
43
43
44
- [[nodiscard]] RequestInfo DataSenderImpl ::recvRequestInfo ()
44
+ [[nodiscard]] RequestInfo CacheSenderImpl ::recvRequestInfo ()
45
45
{
46
46
using DataContext = tensorrt_llm::executor::kv_cache::DataContext;
47
47
auto * agentConnectionManager = dynamic_cast <executor::kv_cache::AgentConnectionManager*>(mManager );
@@ -93,7 +93,7 @@ DataSenderImpl::DataSenderImpl(executor::kv_cache::ConnectionManager* manager,
93
93
return info;
94
94
}
95
95
96
- void DataSenderImpl ::sendSync (LlmRequest const & llmRequest)
96
+ void CacheSenderImpl ::sendSync (LlmRequest const & llmRequest)
97
97
{
98
98
auto it = mRequestToSession .find (llmRequest.mRequestId );
99
99
TLLM_CHECK (it != mRequestToSession .end ());
@@ -102,32 +102,32 @@ void DataSenderImpl::sendSync(LlmRequest const& llmRequest)
102
102
mFormatter ->format (session);
103
103
}
104
104
105
- [[nodiscard]] executor::kv_cache::CommState const & DataSenderImpl ::getCommState () const
105
+ [[nodiscard]] executor::kv_cache::CommState const & CacheSenderImpl ::getCommState () const
106
106
{
107
107
return mSelfState .getCommState ().value ();
108
108
}
109
109
110
- void DataSenderImpl ::setCommState (executor::kv_cache::CommState commState)
110
+ void CacheSenderImpl ::setCommState (executor::kv_cache::CommState commState)
111
111
{
112
112
mSelfState .setCommState (std::move (commState));
113
113
}
114
114
115
- [[nodiscard]] size_t DataSenderImpl ::getCounterpartsCount (LlmRequest::RequestIdType requestId) const
115
+ [[nodiscard]] size_t CacheSenderImpl ::getCounterpartsCount (LlmRequest::RequestIdType requestId) const
116
116
{
117
117
auto it = mRequestToSession .find (requestId);
118
118
TLLM_CHECK (it != mRequestToSession .end ());
119
119
return it->second .getConnections ().size ();
120
120
}
121
121
122
- void DataSenderImpl ::release (LlmRequest::RequestIdType requestId)
122
+ void CacheSenderImpl ::release (LlmRequest::RequestIdType requestId)
123
123
{
124
124
auto it = mRequestToSession .find (requestId);
125
125
TLLM_CHECK (it != mRequestToSession .end ());
126
126
std::unique_lock<std::mutex> lk (mMtxForMap );
127
127
mRequestToSession .erase (it);
128
128
}
129
129
130
- DataReceiverImpl::DataReceiverImpl (executor::kv_cache::ConnectionManager* manager,
130
+ CacheReceiverImpl::CacheReceiverImpl (executor::kv_cache::ConnectionManager* manager,
131
131
executor::kv_cache::CacheState selfCacheState, SizeType32 selfIndex, std::unique_ptr<BaseCacheFormatter> formatter)
132
132
: mManager {manager}
133
133
, mSelfState {std::move (selfCacheState), executor::kv_cache::CommState{manager->getCommState ()}}
@@ -138,7 +138,7 @@ DataReceiverImpl::DataReceiverImpl(executor::kv_cache::ConnectionManager* manage
138
138
TLLM_CHECK (mFormatter );
139
139
}
140
140
141
- TransferSession DataReceiverImpl ::sendRequestInfo (LlmRequest const & llmRequest)
141
+ TransferSession CacheReceiverImpl ::sendRequestInfo (LlmRequest const & llmRequest)
142
142
{
143
143
uint64_t requestId = llmRequest.getContextPhaseParams ().value ().getReqId ();
144
144
auto const & contextState = llmRequest.getDataTransceiverState ();
@@ -204,12 +204,12 @@ TransferSession DataReceiverImpl::sendRequestInfo(LlmRequest const& llmRequest)
204
204
contextState, resource->mBufferManager , &llmRequest);
205
205
}
206
206
207
- void DataReceiverImpl ::receiveSync (TransferSession& session)
207
+ void CacheReceiverImpl ::receiveSync (TransferSession& session)
208
208
{
209
209
mFormatter ->unformat (session);
210
210
}
211
211
212
- void DataReceiverImpl ::sendRequestInfo (executor::kv_cache::Connection const * connection, RequestInfo const & info)
212
+ void CacheReceiverImpl ::sendRequestInfo (executor::kv_cache::Connection const * connection, RequestInfo const & info)
213
213
{
214
214
std::ostringstream oss;
215
215
RequestInfo::serialize (info, oss);
@@ -221,7 +221,7 @@ void DataReceiverImpl::sendRequestInfo(executor::kv_cache::Connection const* con
221
221
connection->send (executor::kv_cache::DataContext{kINFO_TAG }, serializedInfo.data (), infoSize);
222
222
}
223
223
224
- std::unique_ptr<DataReceiverImpl ::ReceiveCacheResource> const & DataReceiverImpl ::getReceiveCacheResource (
224
+ std::unique_ptr<CacheReceiverImpl ::ReceiveCacheResource> const & CacheReceiverImpl ::getReceiveCacheResource (
225
225
LlmRequest const & llmRequest)
226
226
{
227
227
std::scoped_lock<std::mutex> lock (mProcessIoResouceMutex );
0 commit comments