11
11
import io .ray .api .options .ActorLifetime ;
12
12
import io .ray .serve .common .Constants ;
13
13
import io .ray .serve .config .RayServeConfig ;
14
+ import io .ray .serve .dag .Graph ;
15
+ import io .ray .serve .deployment .Application ;
14
16
import io .ray .serve .deployment .Deployment ;
15
17
import io .ray .serve .deployment .DeploymentCreator ;
16
18
import io .ray .serve .deployment .DeploymentRoute ;
17
19
import io .ray .serve .exception .RayServeException ;
18
20
import io .ray .serve .generated .ActorNameList ;
21
+ import io .ray .serve .handle .DeploymentHandle ;
19
22
import io .ray .serve .poll .LongPollClientFactory ;
20
23
import io .ray .serve .replica .ReplicaContext ;
21
24
import io .ray .serve .util .CollectionUtil ;
22
- import io .ray .serve .util .LogUtil ;
25
+ import io .ray .serve .util .MessageFormatter ;
23
26
import io .ray .serve .util .ServeProtoUtil ;
27
+ import java .util .Arrays ;
24
28
import java .util .Collections ;
25
29
import java .util .HashMap ;
30
+ import java .util .List ;
26
31
import java .util .Map ;
27
32
import java .util .Optional ;
33
+ import org .apache .commons .lang3 .RandomStringUtils ;
34
+ import org .apache .commons .lang3 .StringUtils ;
28
35
import org .slf4j .Logger ;
29
36
import org .slf4j .LoggerFactory ;
30
37
@@ -43,25 +50,30 @@ public class Serve {
43
50
* @return
44
51
*/
45
52
public static synchronized ServeControllerClient start (Map <String , String > config ) {
46
- // Initialize ray if needed.
47
- if (!Ray .isInitialized ()) {
48
- System .setProperty ("ray.job.namespace" , Constants .SERVE_NAMESPACE );
49
- Ray .init ();
50
- }
53
+ return serveStart (config );
54
+ }
55
+
56
+ private static synchronized ServeControllerClient serveStart (Map <String , String > config ) {
51
57
52
58
try {
53
59
ServeControllerClient client = getGlobalClient (true );
54
60
LOGGER .info ("Connecting to existing Serve app in namespace {}" , Constants .SERVE_NAMESPACE );
55
61
return client ;
56
62
} catch (RayServeException | IllegalStateException e ) {
57
- LOGGER .info ("There is no instance running on this Ray cluster. A new one will be started." );
63
+ LOGGER .info (
64
+ "There is no Serve instance running on this Ray cluster. A new one will be started." );
65
+ }
66
+
67
+ // Initialize ray if needed.
68
+ if (!Ray .isInitialized ()) {
69
+ init ();
58
70
}
59
71
60
72
int httpPort =
61
73
Optional .ofNullable (config )
62
74
.map (m -> m .get (RayServeConfig .PROXY_HTTP_PORT ))
63
75
.map (Integer ::parseInt )
64
- .orElse (8000 );
76
+ .orElse (Integer . valueOf ( System . getProperty ( RayServeConfig . PROXY_HTTP_PORT , " 8000" )) );
65
77
PyActorHandle controllerAvatar =
66
78
Ray .actor (
67
79
PyActorClass .of ("ray.serve._private.controller" , "ServeControllerAvatar" ),
@@ -95,7 +107,8 @@ public static synchronized ServeControllerClient start(Map<String, String> confi
95
107
}
96
108
} catch (RayTimeoutException e ) {
97
109
String errMsg =
98
- LogUtil .format ("Proxies not available after {}s." , Constants .PROXY_TIMEOUT_S );
110
+ MessageFormatter .format (
111
+ "HTTP proxies not available after {}s." , Constants .PROXY_TIMEOUT_S );
99
112
LOGGER .error (errMsg , e );
100
113
throw new RayServeException (errMsg , e );
101
114
}
@@ -108,24 +121,6 @@ public static synchronized ServeControllerClient start(Map<String, String> confi
108
121
return client ;
109
122
}
110
123
111
- public static synchronized ServeControllerClient start (
112
- boolean detached , boolean dedicatedCpu , Map <String , String > config ) {
113
-
114
- if (!detached ) {
115
- throw new IllegalArgumentException (
116
- "`detached=false` is no longer supported. "
117
- + "In a future release, it will be removed altogether." );
118
- }
119
-
120
- if (dedicatedCpu ) {
121
- throw new IllegalArgumentException (
122
- "`dedicatedCpu=true` is no longer supported. "
123
- + "In a future release, it will be removed altogether." );
124
- }
125
-
126
- return start (config );
127
- }
128
-
129
124
/**
130
125
* Completely shut down the connected Serve instance.
131
126
*
@@ -142,7 +137,7 @@ public static void shutdown() {
142
137
}
143
138
144
139
LongPollClientFactory .stop ();
145
- client .shutdown ();
140
+ client .shutdown (null );
146
141
clearContext ();
147
142
}
148
143
@@ -181,6 +176,11 @@ public static void setInternalReplicaContext(
181
176
deploymentName , replicaTag , controllerName , servableObject , config , appName );
182
177
}
183
178
179
+ /**
180
+ * Set replica information to global context.
181
+ *
182
+ * @param replicaContext
183
+ */
184
184
public static void setInternalReplicaContext (ReplicaContext replicaContext ) {
185
185
INTERNAL_REPLICA_CONTEXT = replicaContext ;
186
186
}
@@ -206,7 +206,8 @@ public static ReplicaContext getReplicaContext() {
206
206
*
207
207
* @param healthCheckController If True, run a health check on the cached controller if it exists.
208
208
* If the check fails, try reconnecting to the controller.
209
- * @return
209
+ * @return ServeControllerClient to the running Serve controller. If there is no running
210
+ * controller and raise_if_no_controller_running is set to False, returns None.
210
211
*/
211
212
public static ServeControllerClient getGlobalClient (boolean healthCheckController ) {
212
213
try {
@@ -222,14 +223,15 @@ public static ServeControllerClient getGlobalClient(boolean healthCheckControlle
222
223
LOGGER .info ("The cached controller has died. Reconnecting." );
223
224
setGlobalClient (null );
224
225
}
225
- synchronized (ServeControllerClient .class ) {
226
- if (GLOBAL_CLIENT != null ) {
227
- return GLOBAL_CLIENT ;
228
- }
229
- return connect ();
230
- }
226
+ return connect ();
231
227
}
232
228
229
+ /**
230
+ * Gets the global client, which stores the controller's handle.
231
+ *
232
+ * @return ServeControllerClient to the running Serve controller. If there is no running
233
+ * controller and raise_if_no_controller_running is set to False, returns None.
234
+ */
233
235
public static ServeControllerClient getGlobalClient () {
234
236
return getGlobalClient (false );
235
237
}
@@ -249,13 +251,19 @@ private static void setGlobalClient(ServeControllerClient client) {
249
251
*
250
252
* @return
251
253
*/
252
- public static ServeControllerClient connect () {
254
+ private static synchronized ServeControllerClient connect () {
255
+
256
+ if (GLOBAL_CLIENT != null ) {
257
+ return GLOBAL_CLIENT ;
258
+ }
259
+
253
260
// Initialize ray if needed.
254
261
if (!Ray .isInitialized ()) {
255
- System .setProperty ("ray.job.namespace" , Constants .SERVE_NAMESPACE );
256
- Ray .init ();
262
+ init ();
257
263
}
258
264
265
+ // When running inside of a replica, _INTERNAL_REPLICA_CONTEXT is set to ensure that the correct
266
+ // instance is connected to.
259
267
String controllerName =
260
268
INTERNAL_REPLICA_CONTEXT != null
261
269
? INTERNAL_REPLICA_CONTEXT .getInternalControllerName ()
@@ -264,7 +272,7 @@ public static ServeControllerClient connect() {
264
272
Optional <BaseActorHandle > optional = Ray .getActor (controllerName , Constants .SERVE_NAMESPACE );
265
273
Preconditions .checkState (
266
274
optional .isPresent (),
267
- LogUtil .format (
275
+ MessageFormatter .format (
268
276
"There is no instance running on this Ray cluster. "
269
277
+ "Please call `serve.start() to start one." ));
270
278
LOGGER .info (
@@ -286,24 +294,25 @@ public static ServeControllerClient connect() {
286
294
*
287
295
* @param name name of the deployment. This must have already been deployed.
288
296
* @return Deployment
297
+ * @deprecated {@value Constants#MIGRATION_MESSAGE}
289
298
*/
299
+ @ Deprecated
290
300
public static Deployment getDeployment (String name ) {
301
+ LOGGER .warn (Constants .MIGRATION_MESSAGE );
291
302
DeploymentRoute deploymentRoute = getGlobalClient ().getDeploymentInfo (name );
292
303
if (deploymentRoute == null ) {
293
304
throw new RayServeException (
294
- LogUtil .format ("Deployment {} was not found. Did you call Deployment.deploy?" , name ));
305
+ MessageFormatter .format (
306
+ "Deployment {} was not found. Did you call Deployment.deploy?" , name ));
295
307
}
296
308
297
309
// TODO use DeploymentCreator
298
310
return new Deployment (
299
- deploymentRoute .getDeploymentInfo ().getReplicaConfig ().getDeploymentDef (),
300
311
name ,
301
312
deploymentRoute .getDeploymentInfo ().getDeploymentConfig (),
313
+ deploymentRoute .getDeploymentInfo ().getReplicaConfig (),
302
314
deploymentRoute .getDeploymentInfo ().getVersion (),
303
- null ,
304
- deploymentRoute .getDeploymentInfo ().getReplicaConfig ().getInitArgs (),
305
- deploymentRoute .getRoute (),
306
- deploymentRoute .getDeploymentInfo ().getReplicaConfig ().getRayActorOptions ());
315
+ deploymentRoute .getRoute ());
307
316
}
308
317
309
318
/**
@@ -312,8 +321,11 @@ public static Deployment getDeployment(String name) {
312
321
* <p>Dictionary maps deployment name to Deployment objects.
313
322
*
314
323
* @return
324
+ * @deprecated {@value Constants#MIGRATION_MESSAGE}
315
325
*/
326
+ @ Deprecated
316
327
public static Map <String , Deployment > listDeployments () {
328
+ LOGGER .warn (Constants .MIGRATION_MESSAGE );
317
329
Map <String , DeploymentRoute > infos = getGlobalClient ().listDeployments ();
318
330
if (infos == null || infos .size () == 0 ) {
319
331
return Collections .emptyMap ();
@@ -323,15 +335,125 @@ public static Map<String, Deployment> listDeployments() {
323
335
deployments .put (
324
336
entry .getKey (),
325
337
new Deployment (
326
- entry .getValue ().getDeploymentInfo ().getReplicaConfig ().getDeploymentDef (),
327
338
entry .getKey (),
328
339
entry .getValue ().getDeploymentInfo ().getDeploymentConfig (),
340
+ entry .getValue ().getDeploymentInfo ().getReplicaConfig (),
329
341
entry .getValue ().getDeploymentInfo ().getVersion (),
330
- null ,
331
- entry .getValue ().getDeploymentInfo ().getReplicaConfig ().getInitArgs (),
332
- entry .getValue ().getRoute (),
333
- entry .getValue ().getDeploymentInfo ().getReplicaConfig ().getRayActorOptions ()));
342
+ entry .getValue ().getRoute ()));
334
343
}
335
344
return deployments ;
336
345
}
346
+
347
+ /**
348
+ * Run an application and return a handle to its ingress deployment.
349
+ *
350
+ * @param target A Serve application returned by `Deployment.bind()`.
351
+ * @return A handle that can be used to call the application.
352
+ */
353
+ public static Optional <DeploymentHandle > run (Application target ) {
354
+ return run (target , true , Constants .SERVE_DEFAULT_APP_NAME , null , null );
355
+ }
356
+
357
+ /**
358
+ * Run an application and return a handle to its ingress deployment.
359
+ *
360
+ * @param target A Serve application returned by `Deployment.bind()`.
361
+ * @param blocking
362
+ * @param name Application name. If not provided, this will be the only application running on the
363
+ * cluster (it will delete all others).
364
+ * @param routePrefix Route prefix for HTTP requests. If not provided, it will use route_prefix of
365
+ * the ingress deployment. If specified neither as an argument nor in the ingress deployment,
366
+ * the route prefix will default to '/'.
367
+ * @param config
368
+ * @return A handle that can be used to call the application.
369
+ */
370
+ public static Optional <DeploymentHandle > run (
371
+ Application target ,
372
+ boolean blocking ,
373
+ String name ,
374
+ String routePrefix ,
375
+ Map <String , String > config ) {
376
+
377
+ if (StringUtils .isBlank (name )) {
378
+ throw new RayServeException ("Application name must a non-empty string." );
379
+ }
380
+
381
+ ServeControllerClient client = serveStart (config );
382
+
383
+ List <Deployment > deployments = Graph .build (target .getInternalDagNode (), name );
384
+ Deployment ingress = Graph .getAndValidateIngressDeployment (deployments );
385
+
386
+ for (Deployment deployment : deployments ) {
387
+ // Overwrite route prefix
388
+ if (StringUtils .isNotBlank (deployment .getRoutePrefix ())
389
+ && StringUtils .isNotBlank (routePrefix )) {
390
+ Preconditions .checkArgument (
391
+ routePrefix .startsWith ("/" ), "The route_prefix must start with a forward slash ('/')" );
392
+ deployment .setRoutePrefix (routePrefix );
393
+ }
394
+ deployment
395
+ .getDeploymentConfig ()
396
+ .setVersion (
397
+ StringUtils .isNotBlank (deployment .getVersion ())
398
+ ? deployment .getVersion ()
399
+ : RandomStringUtils .randomAlphabetic (6 ));
400
+ }
401
+
402
+ client .deployApplication (name , deployments , blocking );
403
+
404
+ return Optional .ofNullable (ingress )
405
+ .map (
406
+ ingressDeployment ->
407
+ client .getDeploymentHandle (ingressDeployment .getName (), name , true ));
408
+ }
409
+
410
+ private static void init () {
411
+ System .setProperty ("ray.job.namespace" , Constants .SERVE_NAMESPACE );
412
+ Ray .init ();
413
+ }
414
+
415
+ /**
416
+ * Get a handle to the application's ingress deployment by name.
417
+ *
418
+ * @param name application name
419
+ * @return
420
+ */
421
+ public static DeploymentHandle getAppHandle (String name ) {
422
+ ServeControllerClient client = getGlobalClient ();
423
+ String ingress =
424
+ (String )
425
+ ((PyActorHandle ) client .getController ())
426
+ .task (PyActorMethod .of ("get_ingress_deployment_name" ), name )
427
+ .remote ()
428
+ .get ();
429
+
430
+ if (StringUtils .isBlank (ingress )) {
431
+ throw new RayServeException (
432
+ MessageFormatter .format ("Application '{}' does not exist." , ingress ));
433
+ }
434
+ return client .getDeploymentHandle (ingress , name , false );
435
+ }
436
+
437
+ /**
438
+ * Delete an application by its name.
439
+ *
440
+ * <p>Deletes the app with all corresponding deployments.
441
+ *
442
+ * @param name application name
443
+ */
444
+ public static void delete (String name ) {
445
+ delete (name , true );
446
+ }
447
+
448
+ /**
449
+ * Delete an application by its name.
450
+ *
451
+ * <p>Deletes the app with all corresponding deployments.
452
+ *
453
+ * @param name application name
454
+ * @param blocking Wait for the application to be deleted or not.
455
+ */
456
+ public static void delete (String name , boolean blocking ) {
457
+ getGlobalClient ().deleteApps (Arrays .asList (name ), blocking );
458
+ }
337
459
}
0 commit comments