Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions reactivex/observable/observable.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def pipe(self, *operators: Callable[[Any], Any]) -> Any:

return pipe_(self, *operators)

def run(self) -> Any:
def run(self, scheduler: Optional[abc.SchedulerBase] = None) -> _T_out:
"""Run source synchronously.

Subscribes to the observable source. Then blocks and waits for the
Expand All @@ -249,7 +249,7 @@ def run(self) -> Any:
"""
from ..run import run

return run(self)
return run(self, scheduler)

def __await__(self) -> Generator[Any, None, _T_out]:
"""Awaits the given observable.
Expand Down
6 changes: 4 additions & 2 deletions reactivex/run.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import threading
from typing import Optional, TypeVar, cast

from reactivex import abc
from reactivex.internal.exceptions import SequenceContainsNoElementsError
from reactivex.scheduler import NewThreadScheduler

from .observable import Observable

scheduler = NewThreadScheduler()
_default_scheduler = NewThreadScheduler()

_T = TypeVar("_T")


def run(source: Observable[_T]) -> _T:
def run(source: Observable[_T], scheduler: Optional[abc.SchedulerBase] = None) -> _T:
"""Run source synchronously.

Subscribes to the observable source. Then blocks and waits for the
Expand All @@ -32,6 +33,7 @@ def run(source: Observable[_T]) -> _T:
Returns:
The last element emitted from the observable.
"""
scheduler = scheduler or _default_scheduler
exception: Optional[Exception] = None
latch = threading.Event()
has_result = False
Expand Down
Loading