9
9
10
10
11
11
def to_future_ (
12
- future_ctor : Optional [Callable [[], " Future[_T]" ]] = None ,
12
+ future_ctor : Optional [Callable [[], Future [_T ]]] = None ,
13
13
scheduler : Optional [abc .SchedulerBase ] = None ,
14
- ) -> Callable [[Observable [_T ]], "Future[_T]" ]:
15
- future_ctor_ : Callable [[], "Future[_T]" ] = (
16
- future_ctor or asyncio .get_event_loop ().create_future
17
- )
18
- future : "Future[_T]" = future_ctor_ ()
14
+ ) -> Callable [[Observable [_T ]], Future [_T ]]:
19
15
20
- def to_future (source : Observable [_T ]) -> " Future[_T]" :
16
+ def to_future (source : Observable [_T ]) -> Future [_T ]:
21
17
"""Converts an existing observable sequence to a Future.
22
18
23
19
If the observable emits a single item, then this item is set as the
@@ -33,25 +29,38 @@ def to_future(source: Observable[_T]) -> "Future[_T]":
33
29
Returns:
34
30
A future with the last value from the observable sequence.
35
31
"""
32
+ if future_ctor is not None :
33
+ future_ctor_ = future_ctor
34
+ else :
35
+ try :
36
+ future_ctor_ = asyncio .get_running_loop ().create_future
37
+ except RuntimeError :
38
+
39
+ def create_future () -> Future [_T ]:
40
+ return Future () # Explicitly using Future[_T]
41
+
42
+ future_ctor_ = create_future # If no running loop
43
+
44
+ future : Future [_T ] = future_ctor_ ()
36
45
37
46
has_value = False
38
- last_value = cast ( _T , None )
47
+ last_value : Optional [ _T ] = None
39
48
40
- def on_next (value : _T ):
49
+ def on_next (value : _T ) -> None :
41
50
nonlocal last_value
42
51
nonlocal has_value
43
52
last_value = value
44
53
has_value = True
45
54
46
- def on_error (err : Exception ):
55
+ def on_error (err : Exception ) -> None :
47
56
if not future .cancelled ():
48
57
future .set_exception (err )
49
58
50
- def on_completed ():
59
+ def on_completed () -> None :
51
60
nonlocal last_value
52
61
if not future .cancelled ():
53
62
if has_value :
54
- future .set_result (last_value )
63
+ future .set_result (cast ( _T , last_value ) )
55
64
else :
56
65
future .set_exception (SequenceContainsNoElementsError ())
57
66
last_value = None
0 commit comments