diff --git a/reactivex/observable/observable.py b/reactivex/observable/observable.py index 6e864da9..e540fe46 100644 --- a/reactivex/observable/observable.py +++ b/reactivex/observable/observable.py @@ -229,7 +229,7 @@ def pipe(self, *operators: Callable[[Any], Any]) -> Any: return pipe_(self, *operators) - def run(self) -> Any: + def run(self, scheduler: Optional[abc.SchedulerBase] = None) -> _T_out: """Run source synchronously. Subscribes to the observable source. Then blocks and waits for the @@ -249,7 +249,7 @@ def run(self) -> Any: """ from ..run import run - return run(self) + return run(self, scheduler) def __await__(self) -> Generator[Any, None, _T_out]: """Awaits the given observable. diff --git a/reactivex/run.py b/reactivex/run.py index 6e932b2e..15b1950b 100644 --- a/reactivex/run.py +++ b/reactivex/run.py @@ -1,17 +1,18 @@ import threading from typing import Optional, TypeVar, cast +from reactivex import abc from reactivex.internal.exceptions import SequenceContainsNoElementsError from reactivex.scheduler import NewThreadScheduler from .observable import Observable -scheduler = NewThreadScheduler() +_default_scheduler = NewThreadScheduler() _T = TypeVar("_T") -def run(source: Observable[_T]) -> _T: +def run(source: Observable[_T], scheduler: Optional[abc.SchedulerBase] = None) -> _T: """Run source synchronously. Subscribes to the observable source. Then blocks and waits for the @@ -32,6 +33,7 @@ def run(source: Observable[_T]) -> _T: Returns: The last element emitted from the observable. """ + scheduler = scheduler or _default_scheduler exception: Optional[Exception] = None latch = threading.Event() has_result = False