1
1
//! Python coroutine implementation, used notably when wrapping `async fn`
2
2
//! with `#[pyfunction]`/`#[pymethods]`.
3
+ use std:: task:: Waker ;
3
4
use std:: {
4
- any:: Any ,
5
5
future:: Future ,
6
6
panic,
7
7
pin:: Pin ,
8
8
sync:: Arc ,
9
9
task:: { Context , Poll } ,
10
10
} ;
11
11
12
- use futures_util:: FutureExt ;
13
12
use pyo3_macros:: { pyclass, pymethods} ;
14
13
15
14
use crate :: {
16
15
coroutine:: waker:: AsyncioWaker ,
17
16
exceptions:: { PyAttributeError , PyRuntimeError , PyStopIteration } ,
18
- panic:: PanicException ,
19
17
pyclass:: IterNextOutput ,
20
18
types:: { PyIterator , PyString } ,
21
19
IntoPy , Py , PyAny , PyErr , PyObject , PyResult , Python ,
@@ -25,19 +23,18 @@ pub(crate) mod cancel;
25
23
mod waker;
26
24
27
25
use crate :: coroutine:: cancel:: ThrowCallback ;
26
+ use crate :: panic:: PanicException ;
28
27
pub use cancel:: CancelHandle ;
29
28
30
29
const COROUTINE_REUSED_ERROR : & str = "cannot reuse already awaited coroutine" ;
31
30
32
- type FutureOutput = Result < PyResult < PyObject > , Box < dyn Any + Send > > ;
33
-
34
31
/// Python coroutine wrapping a [`Future`].
35
32
#[ pyclass( crate = "crate" ) ]
36
33
pub struct Coroutine {
37
34
name : Option < Py < PyString > > ,
38
35
qualname_prefix : Option < & ' static str > ,
39
36
throw_callback : Option < ThrowCallback > ,
40
- future : Option < Pin < Box < dyn Future < Output = FutureOutput > + Send > > > ,
37
+ future : Option < Pin < Box < dyn Future < Output = PyResult < PyObject > > + Send > > > ,
41
38
waker : Option < Arc < AsyncioWaker > > ,
42
39
}
43
40
@@ -68,7 +65,7 @@ impl Coroutine {
68
65
name,
69
66
qualname_prefix,
70
67
throw_callback,
71
- future : Some ( Box :: pin ( panic :: AssertUnwindSafe ( wrap) . catch_unwind ( ) ) ) ,
68
+ future : Some ( Box :: pin ( wrap) ) ,
72
69
waker : None ,
73
70
}
74
71
}
@@ -98,22 +95,28 @@ impl Coroutine {
98
95
} else {
99
96
self . waker = Some ( Arc :: new ( AsyncioWaker :: new ( ) ) ) ;
100
97
}
101
- let waker = futures_util :: task :: waker ( self . waker . clone ( ) . unwrap ( ) ) ;
98
+ let waker = Waker :: from ( self . waker . clone ( ) . unwrap ( ) ) ;
102
99
// poll the Rust future and forward its results if ready
103
- if let Poll :: Ready ( res) = future_rs. as_mut ( ) . poll ( & mut Context :: from_waker ( & waker) ) {
104
- self . close ( ) ;
105
- return match res {
106
- Ok ( res) => Ok ( IterNextOutput :: Return ( res?) ) ,
107
- Err ( err) => Err ( PanicException :: from_panic_payload ( err) ) ,
108
- } ;
100
+ // polling is UnwindSafe because the future is dropped in case of panic
101
+ let poll = || future_rs. as_mut ( ) . poll ( & mut Context :: from_waker ( & waker) ) ;
102
+ match panic:: catch_unwind ( panic:: AssertUnwindSafe ( poll) ) {
103
+ Ok ( Poll :: Ready ( res) ) => {
104
+ self . close ( ) ;
105
+ return Ok ( IterNextOutput :: Return ( res?) ) ;
106
+ }
107
+ Err ( err) => {
108
+ self . close ( ) ;
109
+ return Err ( PanicException :: from_panic_payload ( err) ) ;
110
+ }
111
+ _ => { }
109
112
}
110
113
// otherwise, initialize the waker `asyncio.Future`
111
114
if let Some ( future) = self . waker . as_ref ( ) . unwrap ( ) . initialize_future ( py) ? {
112
115
// `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__`
113
116
// and will yield itself if its result has not been set in polling above
114
117
if let Some ( future) = PyIterator :: from_object ( future) . unwrap ( ) . next ( ) {
115
118
// future has not been leaked into Python for now, and Rust code can only call
116
- // `set_result(None)` in `ArcWake ` implementation, so it's safe to unwrap
119
+ // `set_result(None)` in `Wake ` implementation, so it's safe to unwrap
117
120
return Ok ( IterNextOutput :: Yield ( future. unwrap ( ) . into ( ) ) ) ;
118
121
}
119
122
}
0 commit comments