|
| 1 | +# Error Propagation during Response Streaming |
| 2 | + |
| 3 | +**Status**: Approved |
| 4 | + |
| 5 | +**Authors**: [@nnshah1](https://github.com/nnshah1) [@kthui](https://github.com/kthui) |
| 6 | + |
| 7 | +**Category**: Architecture |
| 8 | + |
| 9 | +**Replaces**: N/A |
| 10 | + |
| 11 | +**Replaced By**: N/A |
| 12 | + |
| 13 | +**Sponsor**: [@ryanolson](https://github.com/ryanolson) [@grahamking](https://github.com/grahamking) |
| 14 | + |
| 15 | +**Required Reviewers**: [@ryanolson](https://github.com/ryanolson) [@grahamking](https://github.com/grahamking) |
| 16 | + |
| 17 | +**Review Date**: Jul 09 2025 |
| 18 | + |
| 19 | +**Pull Request**: https://github.com/ai-dynamo/enhancements/pull/15 |
| 20 | + |
| 21 | +**Implementation PR / Tracking Issue**: https://github.com/ai-dynamo/dynamo/pull/1671 |
| 22 | + |
| 23 | +# Summary |
| 24 | + |
| 25 | +Network level errors may occur while streaming responses from the server back to the client. If they |
| 26 | +occur, these errors must be made available to the client response stream listener. |
| 27 | + |
| 28 | +# Motivation |
| 29 | + |
| 30 | +The client response stream listener currently does not know why a stream is closed. Normally, a |
| 31 | +stream is closed because the server has finished producing all the responses, but it can also be due |
| 32 | +to failure of the server or the network connecting the client to the server. |
| 33 | + |
| 34 | +Knowing why the stream is closed is vital to the ability to detect faults while the server is |
| 35 | +streaming responses back to the client. |
| 36 | + |
| 37 | +For instance, in the current |
| 38 | +[router implementation](https://github.com/ai-dynamo/dynamo/blob/fcfc21f20e53908cedc41a91bbd594283ecf45db/lib/runtime/src/pipeline/network/egress/addressed_router.rs#L165-L174), |
| 39 | +if it is unable to restore the bytes back to the original object, it cannot relay the error back to |
| 40 | +the client response stream consumer for proper handling, and instead it silently skips the response |
| 41 | +that failed to restore. |
| 42 | + |
| 43 | +## Goals |
| 44 | + |
| 45 | +* Update the Response Streaming Interface to Support Propagating Network/Router Level Errors. |
| 46 | + |
| 47 | +### Non Goals |
| 48 | + |
| 49 | +* Mechanism for Detecting Network/Router Level Errors - i.e. Incomplete Stream Detection. |
| 50 | + |
| 51 | +## Requirements |
| 52 | + |
| 53 | +N/A |
| 54 | + |
| 55 | +# Proposal |
| 56 | + |
| 57 | +The client starts its RPCs from |
| 58 | +[this `generate` method](https://github.com/ai-dynamo/dynamo/blob/fcfc21f20e53908cedc41a91bbd594283ecf45db/lib/runtime/src/pipeline/network/egress/addressed_router.rs#L85), |
| 59 | +which returns a stream in `Result<ManyOut<U>, Error>` type. |
| 60 | + |
| 61 | +**Case 1**: The server is unreachable when the request is made |
| 62 | + |
| 63 | +This case is handled by the existing `Result<...>` wrapper around the `ManyOut<U>` stream type. |
| 64 | + |
| 65 | +**Case 2**: The server is disconnected while responses are being returned |
| 66 | + |
| 67 | +The `ManyOut<U>` stream yields responses in the `U` type, defined by the client, that is opaque to |
| 68 | +the network/router layer, so currently there is no way for the network/router layer to |
| 69 | +1. Propagate errors detected at its level back to the client while streaming responses; and |
| 70 | +2. Know if the `U` type is passing an error from the server back to the client. |
| 71 | + |
| 72 | +The proposal will introduce a required `MaybeError` trait that the `U` type must implement, which |
| 73 | +brings in methods for the network/router layer to accomplish the above two points: |
| 74 | +```rust |
| 75 | +pub trait MaybeError { |
| 76 | + /// Construct an instance from an error. |
| 77 | + fn from_err(err: Box<dyn std::error::Error>) -> Self; |
| 78 | + |
| 79 | + /// Construct into an error instance. |
| 80 | + fn err(&self) -> Option<Box<dyn std::error::Error>>; |
| 81 | +} |
| 82 | +``` |
| 83 | + |
| 84 | +When the network/router detects there is an error, a new error response |
| 85 | +```rust |
| 86 | +let error_response = U::from_err(...); |
| 87 | +``` |
| 88 | +is constructed and returned to the response stream for propagating it back to the client. |
| 89 | + |
| 90 | +For detecting if the server sent an error response at the network/router level |
| 91 | +```rust |
| 92 | +let response = response_stream.next(); |
| 93 | +if let Some(err) = response.err() { |
| 94 | + ... |
| 95 | +} |
| 96 | +``` |
| 97 | +The network/router can act on the error if needed and also propagate the error back to the client. |
| 98 | + |
| 99 | +## Example: End-of-Stream Detection |
| 100 | + |
| 101 | +End-of-Stream can be detected by wrapping each response type `U` in a new |
| 102 | +```rust |
| 103 | +#[derive(Serialize, Deserialize, Debug)] |
| 104 | +pub struct NetworkStreamWrapper<U> { |
| 105 | + #[serde(skip_serializing_if = "Option::is_none")] |
| 106 | + pub data: Option<U>, |
| 107 | + pub complete_final: bool, |
| 108 | +} |
| 109 | +``` |
| 110 | +such that each response is sent in the `data` field with `complete_final` set to `false` from the |
| 111 | +server. Once the generate engine is done with producing responses, the server must send an extra |
| 112 | +`NetworkStreamWrapper` response with no `data` and `complete_final` set to `true`. |
| 113 | + |
| 114 | +At the client, the `NetworkStreamWrapper` is torn down and its `data` is yielded back to the stream |
| 115 | +consumer. If the network stream ended before a response with `complete_final = true` is received, an |
| 116 | +extra error response is yielded back to the stream consumer propagating the error back as described |
| 117 | +by the [proposal](#proposal). |
| 118 | + |
| 119 | +**Note** |
| 120 | +* The `NetworkStreamWrapper` is only used while transmitting bytes over the network. It is wrapped |
| 121 | +immediately before serializing responses into bytes and unwrapped immediately after deserializing |
| 122 | +bytes into responses. |
| 123 | +* The end-of-stream event can be detected via other methods, such as Server-Sent Events (SSE), and |
| 124 | +the actual implementation is free to use any other detection methods as long as the interface |
| 125 | +remains the same as described by the [proposal](#proposal). |
| 126 | + |
| 127 | +# Alternate Solutions |
| 128 | + |
| 129 | +## Alt 1 Handle Errors at the Client Implementation Layer |
| 130 | + |
| 131 | +The |
| 132 | +[`Annotated<...>`](https://github.com/ai-dynamo/dynamo/blob/2becce569d59f8dc064c2f07b7995d1e979ade66/lib/runtime/src/protocols/annotated.rs#L32) |
| 133 | +wrapper is typically used by |
| 134 | +[higher level implementations](https://github.com/ai-dynamo/dynamo/blob/2becce569d59f8dc064c2f07b7995d1e979ade66/lib/bindings/python/rust/engine.rs#L145-L146) |
| 135 | +as the opaque `U` type in the `ManyOut<U>` type returned from the |
| 136 | +[network/router layer](https://github.com/ai-dynamo/dynamo/blob/2becce569d59f8dc064c2f07b7995d1e979ade66/lib/runtime/src/pipeline/network/egress/push_router.rs#L165). |
| 137 | + |
| 138 | +The |
| 139 | +[`event`](https://github.com/ai-dynamo/dynamo/blob/2becce569d59f8dc064c2f07b7995d1e979ade66/lib/runtime/src/protocols/annotated.rs#L38) |
| 140 | +field in the `Annotated<...>` object will see a new string `complete_final`, in addition to the |
| 141 | +existing `error` string, that signals the stream is completed and the response with `complete_final` |
| 142 | +is the last response to be sent by the server. |
| 143 | + |
| 144 | +At the server, once response generation is completed, the server MUST send an additional empty |
| 145 | +response with the `complete_final` flag set, before closing the stream. |
| 146 | + |
| 147 | +At the client, if the `complete_final` response arrived and then the stream ended, the client can be |
| 148 | +assured all responses intended to be sent by the server have been received. If the stream ended |
| 149 | +without the `complete_final` response, the client can infer that one or more responses to be sent by |
| 150 | +the server have not arrived, which indicates some error handling needs to be performed, for |
| 151 | +instance, returning an Error to the upper level or restarting the request at where it was left off |
| 152 | +at another node. |
| 153 | + |
| 154 | +Two additional methods are to be added to the `Annotated<...>` implementation |
| 155 | +```rust |
| 156 | +impl<R> Annotated<R> { |
| 157 | + ... |
| 158 | + |
| 159 | + /// Create a new annotated stream with complete final event |
| 160 | + pub fn from_complete_final() -> Self { |
| 161 | + Self { |
| 162 | + data: None, |
| 163 | + id: None, |
| 164 | + event: Some("complete_final".to_string()), |
| 165 | + comment: None, |
| 166 | + } |
| 167 | + } |
| 168 | + |
| 169 | + ... |
| 170 | + |
| 171 | + pub fn is_complete_final(&self) -> bool { |
| 172 | + self.event.as_deref() == Some("complete_final") |
| 173 | + } |
| 174 | + |
| 175 | + ... |
| 176 | +} |
| 177 | +``` |
| 178 | +to facilitate constructing and checking for complete final. |
| 179 | + |
| 180 | +### Future Enhancements |
| 181 | + |
| 182 | +#### Extension to NvExt |
| 183 | + |
| 184 | +While this proposal is intended for enhancing |
| 185 | +[dynamo/lib/runtime](https://github.com/ai-dynamo/dynamo/tree/2becce569d59f8dc064c2f07b7995d1e979ade66/lib/runtime) |
| 186 | +Python binding implementation, the same idea can also be applied to |
| 187 | +[dynamo/lib/llm](https://github.com/ai-dynamo/dynamo/tree/2becce569d59f8dc064c2f07b7995d1e979ade66/lib/llm) |
| 188 | +OpenAI implementation, at |
| 189 | +[`NVExt`](https://github.com/ai-dynamo/dynamo/blob/2becce569d59f8dc064c2f07b7995d1e979ade66/lib/llm/src/protocols/openai/nvext.rs#L25-L64). |
| 190 | + |
| 191 | +An EOS (end of stream) annotation can be appended to the |
| 192 | +[NVExt.annotations](https://github.com/ai-dynamo/dynamo/blob/2becce569d59f8dc064c2f07b7995d1e979ade66/lib/llm/src/protocols/openai/nvext.rs#L63) |
| 193 | +list, signaling that the response is the last one to be sent by the server. |
| 194 | + |
| 195 | +Ref: https://github.com/ai-dynamo/enhancements/pull/15#issuecomment-3002343978 |
| 196 | + |
| 197 | +The |
| 198 | +[`NvCreateChatCompletionStreamResponse`](https://github.com/ai-dynamo/dynamo/blob/2becce569d59f8dc064c2f07b7995d1e979ade66/lib/llm/src/protocols/openai/chat_completions.rs#L64-L68) |
| 199 | +struct will need to include an optional |
| 200 | +```rust |
| 201 | +#[serde(skip_serializing_if = "Option::is_none")] |
| 202 | +pub nvext: Option<NvExt> |
| 203 | +``` |
| 204 | +field, similar to the |
| 205 | +[request struct](https://github.com/ai-dynamo/dynamo/blob/2becce569d59f8dc064c2f07b7995d1e979ade66/lib/llm/src/protocols/openai/chat_completions.rs#L37-L44), |
| 206 | +in order to pass the flag with responses. |
| 207 | + |
| 208 | +**Open question**: The OpenAI API includes a |
| 209 | +["finish_reason"](https://platform.openai.com/docs/api-reference/chat-streaming/streaming) |
| 210 | +variable in its response JSON indicating the end of stream, for example: |
| 211 | +```json |
| 212 | +{..., "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]} |
| 213 | +{..., "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]} |
| 214 | +.... |
| 215 | +{..., "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]} |
| 216 | +``` |
| 217 | +Since `NvCreateChatCompletionStreamResponse` contains the full OpenAI response in its `inner`, is |
| 218 | +the duplicate end of stream flag in `NVExt` in `NvCreateChatCompletionStreamResponse` needed? |
| 219 | + |
| 220 | +**Pros:** |
| 221 | + |
| 222 | +* No change to the current network/router interface, as the `U` opaque type is retained. |
| 223 | + |
| 224 | +**Cons:** |
| 225 | + |
| 226 | +* It is cumbersome for each and every client implementation to implement the same basic error |
| 227 | +detection and reporting mechanism that can be easily done at the network/router layer. |
| 228 | + |
| 229 | +**Reason Rejected:** |
| 230 | + |
| 231 | +* Network error handling should be done by the network/router layer. |
| 232 | + |
| 233 | +**Notes:** |
| 234 | + |
| 235 | +* Original design |
| 236 | +[does NOT intend to handle error](https://github.com/ai-dynamo/enhancements/pull/15#pullrequestreview-2954974976) |
| 237 | +at the network/router layer. |
| 238 | + |
| 239 | +## Alt 2 Add a Fault Tolerance Layer on top of the current Router Layer |
| 240 | + |
| 241 | +Add a FaultTolerance Layer that implements the `Result<...>` wrapper that will become the `U` type |
| 242 | +at the current Router Layer. The FaultTolerance Layer should implement the same |
| 243 | +[`PushWorkHandler`](https://github.com/ai-dynamo/dynamo/blob/fcfc21f20e53908cedc41a91bbd594283ecf45db/lib/runtime/src/pipeline/network.rs#L323) |
| 244 | +trait and accepts objects implementing the |
| 245 | +[`AsyncEngine`](https://github.com/ai-dynamo/dynamo/blob/fcfc21f20e53908cedc41a91bbd594283ecf45db/lib/runtime/src/engine.rs#L104) |
| 246 | +trait, so it shares the same interface as the Router. |
| 247 | + |
| 248 | +**Pros:** |
| 249 | + |
| 250 | +* No change to the current network/router interface, as the `U` opaque type is retained. |
| 251 | +* Fault Tolerance implementations can be added to this layer, and written in Rust. |
| 252 | + |
| 253 | +**Cons:** |
| 254 | + |
| 255 | +* The additional layer is overly complicated, because the FaultTolerance Layer is basically an |
| 256 | +extension to the Router Layer without overriding any Router functionalities. |
| 257 | + |
| 258 | +**Reason Rejected:** |
| 259 | + |
| 260 | +* Use the more generic `Annotated<...>` wrapper. |
| 261 | + |
| 262 | +**Notes:** |
| 263 | + |
| 264 | +The current Python bindings can be updated from |
| 265 | +``` |
| 266 | +Python binding | runtime |
| 267 | +| | | |
| 268 | +`--> Client | `--> component |
| 269 | + | | | | |
| 270 | + | | | `--> client <. |
| 271 | + | | | | owns an instance of; and |
| 272 | + | | `--> pipeline | obtains available instances from etcd and tracks/reports downed ones |
| 273 | + | | | | |
| 274 | + | | `--> network/router |
| 275 | + | | ^ |
| 276 | + `-----------------------------------' |
| 277 | + owns an instance of |
| 278 | +``` |
| 279 | +to |
| 280 | +``` |
| 281 | +Python binding | runtime |
| 282 | +| | | |
| 283 | +`--> Client | `--> component |
| 284 | + | | | | |
| 285 | + | | | `--> client <. |
| 286 | + | | | | owns an instance of; and |
| 287 | + | | `--> pipeline | obtains available instances |
| 288 | + | | | | |
| 289 | + | | `--> network/router <--. |
| 290 | + | | | | owns an instance of; and |
| 291 | + | | `--> fault_tolerance --' reports downed instances over router to client |
| 292 | + | | ^ |
| 293 | + `---------------------------' |
| 294 | + owns an instance of |
| 295 | +``` |
| 296 | +when creating a client. |
| 297 | + |
| 298 | +## Alt 3 Wrap Each Response in Result<U, anyhow::Error> |
| 299 | + |
| 300 | +### Part 1: Add a New Generate Method for Propagating Error from Network/Router Layer |
| 301 | + |
| 302 | +The client starts its RPCs from |
| 303 | +[this `generate` method](https://github.com/ai-dynamo/dynamo/blob/fcfc21f20e53908cedc41a91bbd594283ecf45db/lib/runtime/src/pipeline/network/egress/addressed_router.rs#L85), |
| 304 | +which returns a stream in `Result<ManyOut<U>, Error>` type. |
| 305 | + |
| 306 | +**Case 1**: The server is unreachable when the request is made |
| 307 | + |
| 308 | +This case is handled by the existing `Result<...>` wrapper around the `ManyOut<U>` stream type. |
| 309 | + |
| 310 | +**Case 2**: The server is disconnected while responses are being returned |
| 311 | + |
| 312 | +The `ManyOut<U>` stream yields responses in the `U` type, defined by the client, that is opaque to |
| 313 | +the network/router layer, so currently there is no way for the network/router layer to propagate any |
| 314 | +error back to the client while streaming responses. |
| 315 | + |
| 316 | +The proposal is to wrap the `U` type in a `Result<...>` wrapper, similar to how the stream is |
| 317 | +currently wrapped, so errors detected at the network/router level can be propagated to the client. |
| 318 | + |
| 319 | +To avoid disrupting existing behaviors, a new |
| 320 | +```rust |
| 321 | +async fn generate_with_error_detection(&self, request: SingleIn<AddressedRequest<T>>) -> Result<ManyOut<Result<U, Error>>, Error> |
| 322 | +``` |
| 323 | +method is to be added alongside the existing `generate` method as a part of the |
| 324 | +[`AddressedPushRouter` struct](https://github.com/ai-dynamo/dynamo/blob/fcfc21f20e53908cedc41a91bbd594283ecf45db/lib/runtime/src/pipeline/network/egress/addressed_router.rs#L59). |
| 325 | +The change can be progressively applied to existing implementations, by switching over from the |
| 326 | +existing `generate` method to the new `generate_with_error_detection` method. |
| 327 | + |
| 328 | +### Part 2: Implement End of Stream Detection into Network/Router Layer |
| 329 | + |
| 330 | +The server handles RPCs with the |
| 331 | +[`PushWorkHandler` trait](https://github.com/ai-dynamo/dynamo/blob/fcfc21f20e53908cedc41a91bbd594283ecf45db/lib/runtime/src/pipeline/network/ingress/push_handler.rs#L20C24-L20C39), |
| 332 | +specifically each response over the stream goes through |
| 333 | +[here](https://github.com/ai-dynamo/dynamo/blob/fcfc21f20e53908cedc41a91bbd594283ecf45db/lib/runtime/src/pipeline/network/ingress/push_handler.rs#L100-L109). |
| 334 | +This part of the code can reliably capture the end of stream signal, which will be made available to |
| 335 | +the client over the RPC. |
| 336 | + |
| 337 | +Instead of sending each response from the worker `generate` method directly back to the client, each |
| 338 | +response is contained in a new |
| 339 | +```rust |
| 340 | +#[derive(Serialize, Deserialize, Debug)] |
| 341 | +pub struct StreamItemWrapper<U> { |
| 342 | + #[serde(skip_serializing_if = "Option::is_none")] |
| 343 | + pub data: Option<U>, |
| 344 | + pub complete_final: bool |
| 345 | +} |
| 346 | +``` |
| 347 | +where `data` holds the response and `complete_final` indicates if this is the last response. |
| 348 | + |
| 349 | +While the worker's `generate` method is producing new responses, each response is sent with the |
| 350 | +`complete_final = false`. After the last response is produced by the worker's `generate` method, an |
| 351 | +extra response with no data and `complete_final = true` is sent to the client, before ending the |
| 352 | +RPC. |
| 353 | + |
| 354 | +The client receives and processes responses at the |
| 355 | +[end of its `generate` method](https://github.com/ai-dynamo/dynamo/blob/fcfc21f20e53908cedc41a91bbd594283ecf45db/lib/runtime/src/pipeline/network/egress/addressed_router.rs#L163-L174). |
| 356 | +The response is first reconstructed into the `StreamItemWrapper<U>`, and then the `data` is |
| 357 | +extracted and yielded back to the client. |
| 358 | + |
| 359 | +There are two cases of connection stream error detectable by the client's `generate` method at the |
| 360 | +network/router layer: |
| 361 | + |
| 362 | +**Case 1**: Connection stream ended before `complete_final` |
| 363 | + |
| 364 | +An `Err(...)` response, as discussed in |
| 365 | +[Part 1](#part-1-add-a-new-generate-method-for-propagating-error-from-networkrouter-layer), is |
| 366 | +yielded and then the response stream is closed. |
| 367 | + |
| 368 | +**Case 2**: `complete_final` is received and then more responses arrive |
| 369 | + |
| 370 | +An `Err(...)` response, as discussed in |
| 371 | +[Part 1](#part-1-add-a-new-generate-method-for-propagating-error-from-networkrouter-layer), is |
| 372 | +yielded upon arrival of the first response after the response with `complete_final`. After the |
| 373 | +`Err(...)` response is yielded, the response stream is closed. |
| 374 | + |
| 375 | +The `Err(...)`s will be constructed using |
| 376 | +[`anyhow::Error::msg`](https://docs.rs/anyhow/1.0.98/anyhow/struct.Error.html#method.msg). |
0 commit comments