@@ -493,6 +493,7 @@ impl<C: Config> Client<C> {
493
493
pub struct OpenAIEventStream < O : DeserializeOwned + Send + ' static > {
494
494
#[ pin]
495
495
stream : Filter < EventSource , future:: Ready < bool > , fn ( & Result < Event , reqwest_eventsource:: Error > ) -> future:: Ready < bool > > ,
496
+ done : bool ,
496
497
_phantom_data : PhantomData < O > ,
497
498
}
498
499
@@ -503,6 +504,7 @@ impl<O: DeserializeOwned + Send + 'static> OpenAIEventStream<O> {
503
504
// filter out the first event which is always Event::Open
504
505
future:: ready ( !( result. is_ok ( ) && result. as_ref ( ) . unwrap ( ) . eq ( & Event :: Open ) ) )
505
506
) ,
507
+ done : false ,
506
508
_phantom_data : PhantomData ,
507
509
}
508
510
}
@@ -513,6 +515,9 @@ impl<O: DeserializeOwned + Send + 'static> Stream for OpenAIEventStream<O> {
513
515
514
516
fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
515
517
let this = self . project ( ) ;
518
+ if * this. done {
519
+ return Poll :: Ready ( None ) ;
520
+ }
516
521
let stream: Pin < & mut _ > = this. stream ;
517
522
match stream. poll_next ( cx) {
518
523
Poll :: Ready ( response) => {
@@ -523,17 +528,24 @@ impl<O: DeserializeOwned + Send + 'static> Stream for OpenAIEventStream<O> {
523
528
Event :: Open => unreachable ! ( ) , // it has been filtered out
524
529
Event :: Message ( message) => {
525
530
if message. data == "[DONE]" {
531
+ * this. done = true ;
526
532
Poll :: Ready ( None ) // end of the stream, defined by OpenAI
527
533
} else {
528
534
// deserialize the data
529
535
match serde_json:: from_str :: < O > ( & message. data ) {
530
- Err ( e) => Poll :: Ready ( Some ( Err ( map_deserialization_error ( e, & message. data . as_bytes ( ) ) ) ) ) ,
536
+ Err ( e) => {
537
+ * this. done = true ;
538
+ Poll :: Ready ( Some ( Err ( map_deserialization_error ( e, & message. data . as_bytes ( ) ) ) ) )
539
+ }
531
540
Ok ( output) => Poll :: Ready ( Some ( Ok ( output) ) ) ,
532
541
}
533
542
}
534
543
}
535
544
}
536
- Err ( e) => Poll :: Ready ( Some ( Err ( OpenAIError :: StreamError ( e. to_string ( ) ) ) ) )
545
+ Err ( e) => {
546
+ * this. done = true ;
547
+ Poll :: Ready ( Some ( Err ( OpenAIError :: StreamError ( e. to_string ( ) ) ) ) )
548
+ }
537
549
}
538
550
}
539
551
}
0 commit comments