Skip to content

Commit 58f5175

Browse files
edoakesdstrodtman
authored andcommitted
[core] Remove gcs_rpc_server.h dependency from GcsTaskManager (#56001)
Splitting gRPC service interface from implementation. Stacked on: #55999 --------- Signed-off-by: Edward Oakes <[email protected]> Signed-off-by: Douglas Strodtman <[email protected]>
1 parent af58aa3 commit 58f5175

File tree

8 files changed

+99
-93
lines changed

8 files changed

+99
-93
lines changed

src/ray/gcs/gcs_server/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,15 @@ ray_cc_library(
199199
hdrs = ["gcs_task_manager.h"],
200200
deps = [
201201
":gcs_usage_stats_client",
202+
":grpc_service_interfaces",
202203
"//src/ray/common:asio",
203204
"//src/ray/common:id",
204205
"//src/ray/common:ray_config",
205206
"//src/ray/common:status",
206207
"//src/ray/gcs:gcs_pb_util",
207208
"//src/ray/protobuf:events_event_aggregator_service_cc_proto",
208209
"//src/ray/protobuf:gcs_cc_proto",
209-
"//src/ray/rpc:gcs_server",
210+
"//src/ray/stats:stats_metric",
210211
"//src/ray/util:counter_map",
211212
"@com_google_absl//absl/container:flat_hash_map",
212213
"@com_google_absl//absl/container:flat_hash_set",

src/ray/gcs/gcs_server/gcs_server.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -735,10 +735,14 @@ void GcsServer::InitGcsTaskManager() {
735735
auto &io_context = io_context_provider_.GetIOContext<GcsTaskManager>();
736736
gcs_task_manager_ = std::make_unique<GcsTaskManager>(io_context);
737737
// Register service.
738-
rpc_server_.RegisterService(
739-
std::make_unique<rpc::TaskInfoGrpcService>(io_context, *gcs_task_manager_));
740-
rpc_server_.RegisterService(
741-
std::make_unique<rpc::RayEventExportGrpcService>(io_context, *gcs_task_manager_));
738+
rpc_server_.RegisterService(std::make_unique<rpc::TaskInfoGrpcService>(
739+
io_context,
740+
*gcs_task_manager_,
741+
RayConfig::instance().gcs_max_active_rpcs_per_handler()));
742+
rpc_server_.RegisterService(std::make_unique<rpc::RayEventExportGrpcService>(
743+
io_context,
744+
*gcs_task_manager_,
745+
RayConfig::instance().gcs_max_active_rpcs_per_handler()));
742746
}
743747

744748
void GcsServer::InstallEventListeners() {

src/ray/gcs/gcs_server/gcs_task_manager.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
#include "absl/container/flat_hash_map.h"
2525
#include "absl/container/flat_hash_set.h"
2626
#include "absl/synchronization/mutex.h"
27+
#include "ray/gcs/gcs_server/grpc_service_interfaces.h"
2728
#include "ray/gcs/gcs_server/usage_stats_client.h"
2829
#include "ray/gcs/pb_util.h"
29-
#include "ray/rpc/gcs/gcs_rpc_server.h"
30+
#include "ray/stats/metric_defs.h"
3031
#include "ray/util/counter_map.h"
3132
#include "src/ray/protobuf/gcs.pb.h"
3233

@@ -92,7 +93,8 @@ class FinishedTaskActorTaskGcPolicy : public TaskEventsGcPolicyInterface {
9293
///
9394
/// This class has its own io_context and io_thread, that's separate from other GCS
9495
/// services. All handling of all rpc should be posted to the single thread it owns.
95-
class GcsTaskManager : public rpc::TaskInfoHandler, public rpc::RayEventExportHandler {
96+
class GcsTaskManager : public rpc::TaskInfoGcsServiceHandler,
97+
public rpc::RayEventExportGcsServiceHandler {
9698
public:
9799
/// Create a GcsTaskManager.
98100
explicit GcsTaskManager(instrumented_io_context &io_service);

src/ray/gcs/gcs_server/grpc_service_interfaces.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,5 +206,26 @@ class InternalKVGcsServiceHandler {
206206
SendReplyCallback send_reply_callback) = 0;
207207
};
208208

209+
class TaskInfoGcsServiceHandler {
210+
public:
211+
virtual ~TaskInfoGcsServiceHandler() = default;
212+
213+
virtual void HandleAddTaskEventData(AddTaskEventDataRequest request,
214+
AddTaskEventDataReply *reply,
215+
SendReplyCallback send_reply_callback) = 0;
216+
217+
virtual void HandleGetTaskEvents(GetTaskEventsRequest request,
218+
GetTaskEventsReply *reply,
219+
SendReplyCallback send_reply_callback) = 0;
220+
};
221+
222+
class RayEventExportGcsServiceHandler {
223+
public:
224+
virtual ~RayEventExportGcsServiceHandler() = default;
225+
virtual void HandleAddEvents(events::AddEventsRequest request,
226+
events::AddEventsReply *reply,
227+
SendReplyCallback send_reply_callback) = 0;
228+
};
229+
209230
} // namespace rpc
210231
} // namespace ray

src/ray/gcs/gcs_server/grpc_services.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,5 +114,23 @@ void InternalKVGrpcService::InitServerCallFactories(
114114
InternalKVGcsService, GetInternalConfig, max_active_rpcs_per_handler_)
115115
}
116116

117+
void TaskInfoGrpcService::InitServerCallFactories(
118+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
119+
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
120+
const ClusterID &cluster_id) {
121+
RPC_SERVICE_HANDLER(TaskInfoGcsService, AddTaskEventData, max_active_rpcs_per_handler_)
122+
RPC_SERVICE_HANDLER(TaskInfoGcsService, GetTaskEvents, max_active_rpcs_per_handler_)
123+
}
124+
125+
using events::AddEventsReply;
126+
using events::AddEventsRequest;
127+
128+
void RayEventExportGrpcService::InitServerCallFactories(
129+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
130+
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
131+
const ClusterID &cluster_id) {
132+
RPC_SERVICE_HANDLER(RayEventExportGcsService, AddEvents, max_active_rpcs_per_handler_)
133+
}
134+
117135
} // namespace rpc
118136
} // namespace ray

src/ray/gcs/gcs_server/grpc_services.h

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,5 +196,51 @@ class InternalKVGrpcService : public GrpcService {
196196
int64_t max_active_rpcs_per_handler_;
197197
};
198198

199+
class TaskInfoGrpcService : public GrpcService {
200+
public:
201+
explicit TaskInfoGrpcService(instrumented_io_context &io_service,
202+
TaskInfoGcsServiceHandler &handler,
203+
int64_t max_active_rpcs_per_handler)
204+
: GrpcService(io_service),
205+
service_handler_(handler),
206+
max_active_rpcs_per_handler_(max_active_rpcs_per_handler){};
207+
208+
protected:
209+
grpc::Service &GetGrpcService() override { return service_; }
210+
211+
void InitServerCallFactories(
212+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
213+
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
214+
const ClusterID &cluster_id) override;
215+
216+
private:
217+
TaskInfoGcsService::AsyncService service_;
218+
TaskInfoGcsServiceHandler &service_handler_;
219+
int64_t max_active_rpcs_per_handler_;
220+
};
221+
222+
class RayEventExportGrpcService : public GrpcService {
223+
public:
224+
explicit RayEventExportGrpcService(instrumented_io_context &io_service,
225+
RayEventExportGcsServiceHandler &handler,
226+
int64_t max_active_rpcs_per_handler)
227+
: GrpcService(io_service),
228+
service_handler_(handler),
229+
max_active_rpcs_per_handler_(max_active_rpcs_per_handler){};
230+
231+
protected:
232+
grpc::Service &GetGrpcService() override { return service_; }
233+
234+
void InitServerCallFactories(
235+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
236+
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
237+
const ClusterID &cluster_id) override;
238+
239+
private:
240+
RayEventExportGcsService::AsyncService service_;
241+
RayEventExportGcsServiceHandler &service_handler_;
242+
int64_t max_active_rpcs_per_handler_;
243+
};
244+
199245
} // namespace rpc
200246
} // namespace ray

src/ray/gcs/gcs_server/tests/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ ray_cc_test(
149149
],
150150
tags = ["team:core"],
151151
deps = [
152-
":gcs_server_test_util",
153152
"//:ray_mock",
154153
"//src/ray/gcs/gcs_server:gcs_task_manager",
155154
"//src/ray/gcs/tests:gcs_test_util_lib",

src/ray/rpc/gcs/gcs_rpc_server.h

Lines changed: 0 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -117,16 +117,6 @@ namespace rpc {
117117
HANDLER, \
118118
RayConfig::instance().gcs_max_active_rpcs_per_handler())
119119

120-
#define TASK_INFO_SERVICE_RPC_HANDLER(HANDLER) \
121-
RPC_SERVICE_HANDLER(TaskInfoGcsService, \
122-
HANDLER, \
123-
RayConfig::instance().gcs_max_active_rpcs_per_handler())
124-
125-
#define RAY_EVENT_EXPORT_SERVICE_RPC_HANDLER(HANDLER) \
126-
RPC_SERVICE_HANDLER(RayEventExportGcsService, \
127-
HANDLER, \
128-
RayConfig::instance().gcs_max_active_rpcs_per_handler())
129-
130120
#define OBJECT_INFO_SERVICE_RPC_HANDLER(HANDLER) \
131121
RPC_SERVICE_HANDLER(ObjectInfoGcsService, \
132122
HANDLER, \
@@ -291,83 +281,8 @@ class PlacementGroupInfoGrpcService : public GrpcService {
291281
PlacementGroupInfoGcsServiceHandler &service_handler_;
292282
};
293283

294-
class TaskInfoGcsServiceHandler {
295-
public:
296-
virtual ~TaskInfoGcsServiceHandler() = default;
297-
298-
virtual void HandleAddTaskEventData(AddTaskEventDataRequest request,
299-
AddTaskEventDataReply *reply,
300-
SendReplyCallback send_reply_callback) = 0;
301-
302-
virtual void HandleGetTaskEvents(GetTaskEventsRequest request,
303-
GetTaskEventsReply *reply,
304-
SendReplyCallback send_reply_callback) = 0;
305-
};
306-
307-
/// The `GrpcService` for `TaskInfoGcsService`.
308-
class TaskInfoGrpcService : public GrpcService {
309-
public:
310-
/// Constructor.
311-
///
312-
/// \param[in] io_service IO service to run the handler.
313-
/// \param[in] handler The service handler that actually handle the requests.
314-
explicit TaskInfoGrpcService(instrumented_io_context &io_service,
315-
TaskInfoGcsServiceHandler &handler)
316-
: GrpcService(io_service), service_handler_(handler){};
317-
318-
protected:
319-
grpc::Service &GetGrpcService() override { return service_; }
320-
321-
void InitServerCallFactories(
322-
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
323-
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
324-
const ClusterID &cluster_id) override {
325-
TASK_INFO_SERVICE_RPC_HANDLER(AddTaskEventData);
326-
TASK_INFO_SERVICE_RPC_HANDLER(GetTaskEvents);
327-
}
328-
329-
private:
330-
/// The grpc async service object.
331-
TaskInfoGcsService::AsyncService service_;
332-
/// The service handler that actually handle the requests.
333-
TaskInfoGcsServiceHandler &service_handler_;
334-
};
335-
336-
class RayEventExportGcsServiceHandler {
337-
public:
338-
virtual ~RayEventExportGcsServiceHandler() = default;
339-
virtual void HandleAddEvents(AddEventsRequest request,
340-
AddEventsReply *reply,
341-
SendReplyCallback send_reply_callback) = 0;
342-
};
343-
344-
/// The `GrpcService` for `RayEventExportGcsService`.
345-
class RayEventExportGrpcService : public GrpcService {
346-
public:
347-
explicit RayEventExportGrpcService(instrumented_io_context &io_service,
348-
RayEventExportGcsServiceHandler &handler)
349-
: GrpcService(io_service), service_handler_(handler) {}
350-
351-
protected:
352-
grpc::Service &GetGrpcService() override { return service_; }
353-
void InitServerCallFactories(
354-
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
355-
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
356-
const ClusterID &cluster_id) override {
357-
RAY_EVENT_EXPORT_SERVICE_RPC_HANDLER(AddEvents);
358-
}
359-
360-
private:
361-
/// The grpc async service object.
362-
RayEventExportGcsService::AsyncService service_;
363-
/// The service handler that actually handle the requests.
364-
RayEventExportGcsServiceHandler &service_handler_;
365-
};
366-
367284
using ActorInfoHandler = ActorInfoGcsServiceHandler;
368285
using PlacementGroupInfoHandler = PlacementGroupInfoGcsServiceHandler;
369-
using TaskInfoHandler = TaskInfoGcsServiceHandler;
370-
using RayEventExportHandler = RayEventExportGcsServiceHandler;
371286

372287
} // namespace rpc
373288
} // namespace ray

0 commit comments

Comments
 (0)