Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion reactivex/observable/observable.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,11 @@ def __await__(self) -> Generator[Any, None, _T_out]:
"""
from ..operators._tofuture import to_future_

loop = asyncio.get_event_loop()
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()

future: asyncio.Future[_T_out] = self.pipe(
to_future_(scheduler=AsyncIOScheduler(loop=loop))
)
Expand Down
33 changes: 21 additions & 12 deletions reactivex/operators/_tofuture.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,11 @@


def to_future_(
future_ctor: Optional[Callable[[], "Future[_T]"]] = None,
future_ctor: Optional[Callable[[], Future[_T]]] = None,
scheduler: Optional[abc.SchedulerBase] = None,
) -> Callable[[Observable[_T]], "Future[_T]"]:
future_ctor_: Callable[[], "Future[_T]"] = (
future_ctor or asyncio.get_event_loop().create_future
)
future: "Future[_T]" = future_ctor_()
) -> Callable[[Observable[_T]], Future[_T]]:

def to_future(source: Observable[_T]) -> "Future[_T]":
def to_future(source: Observable[_T]) -> Future[_T]:
"""Converts an existing observable sequence to a Future.

If the observable emits a single item, then this item is set as the
Expand All @@ -33,25 +29,38 @@ def to_future(source: Observable[_T]) -> "Future[_T]":
Returns:
A future with the last value from the observable sequence.
"""
if future_ctor is not None:
future_ctor_ = future_ctor
else:
try:
future_ctor_ = asyncio.get_running_loop().create_future
except RuntimeError:

def create_future() -> Future[_T]:
return Future() # Explicitly using Future[_T]

future_ctor_ = create_future # If no running loop

future: Future[_T] = future_ctor_()

has_value = False
last_value = cast(_T, None)
last_value: Optional[_T] = None

def on_next(value: _T):
def on_next(value: _T) -> None:
nonlocal last_value
nonlocal has_value
last_value = value
has_value = True

def on_error(err: Exception):
def on_error(err: Exception) -> None:
if not future.cancelled():
future.set_exception(err)

def on_completed():
def on_completed() -> None:
nonlocal last_value
if not future.cancelled():
if has_value:
future.set_result(last_value)
future.set_result(cast(_T, last_value))
else:
future.set_exception(SequenceContainsNoElementsError())
last_value = None
Expand Down
2 changes: 1 addition & 1 deletion reactivex/scheduler/eventloop/asyncioscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:

Args:
loop: Instance of asyncio event loop to use; typically, you would
get this by asyncio.get_event_loop()
get this by asyncio.get_running_loop()
"""
super().__init__()
self._loop: asyncio.AbstractEventLoop = loop
Expand Down
12 changes: 5 additions & 7 deletions reactivex/scheduler/eventloop/asynciothreadsafescheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,11 @@ def _on_self_loop_or_not_running(self) -> bool:
"""
if not self._loop.is_running():
return True
current_loop = None

try:
# In python 3.7 there asyncio.get_running_loop() is prefered.
current_loop = asyncio.get_event_loop()
current_loop = asyncio.get_running_loop()
except RuntimeError:
# If there is no loop in current thread at all, and it is not main
# thread, we get error like:
# RuntimeError: There is no current event loop in thread 'Thread-1'
pass
# If no running event loop is found, assume we're in a different thread
return True

return self._loop == current_loop
2 changes: 1 addition & 1 deletion tests/test_observable/test_flatmap_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class TestFlatMapAsync(unittest.TestCase):
def test_flat_map_async(self):
actual_next = None
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
scheduler = AsyncIOScheduler(loop=loop)

def mapper(i: int):
Expand Down
8 changes: 4 additions & 4 deletions tests/test_observable/test_fromfuture.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class TestFromFuture(unittest.TestCase):
def test_future_success(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
success = [False, True, False]

async def go():
Expand All @@ -31,7 +31,7 @@ def on_completed():
assert all(success)

def test_future_failure(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
success = [True, False, True]

async def go():
Expand All @@ -57,7 +57,7 @@ def on_completed():
assert all(success)

def test_future_cancel(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
success = [True, False, True]

async def go():
Expand All @@ -80,7 +80,7 @@ def on_completed():
assert all(success)

def test_future_dispose(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
success = [True, True, True]

async def go():
Expand Down
4 changes: 2 additions & 2 deletions tests/test_observable/test_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class TestStart(unittest.TestCase):
def test_start_async(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
success = [False]

async def go():
Expand All @@ -36,7 +36,7 @@ def on_next(x):
assert all(success)

def test_start_async_error(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
success = [False]

async def go():
Expand Down
14 changes: 7 additions & 7 deletions tests/test_observable/test_tofuture.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

class TestToFuture(unittest.TestCase):
def test_await_success(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
result = None

async def go():
Expand All @@ -30,7 +30,7 @@ async def go():
assert result == 42

def test_await_success_on_sequence(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
result = None

async def go():
Expand All @@ -42,7 +42,7 @@ async def go():
assert result == 42

def test_await_error(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
error = Exception("error")
result = None

Expand All @@ -58,7 +58,7 @@ async def go():
assert result == error

def test_await_empty_observable(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
result = None

async def go():
Expand All @@ -71,7 +71,7 @@ async def go():
)

def test_await_with_delay(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
result = None

async def go():
Expand All @@ -83,7 +83,7 @@ async def go():
assert result == 42

def test_cancel(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()

async def go():
source = reactivex.return_value(42)
Expand All @@ -96,7 +96,7 @@ async def go():
self.assertRaises(asyncio.CancelledError, loop.run_until_complete, go())

def test_dispose_on_cancel(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
sub = Subject()

async def using_sub():
Expand Down
10 changes: 5 additions & 5 deletions tests/test_scheduler/test_eventloop/test_asyncioscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@
class TestAsyncIOScheduler(unittest.TestCase):
@pytest.mark.skipif(CI, reason="Flaky test in GitHub Actions")
def test_asyncio_schedule_now(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
scheduler = AsyncIOScheduler(loop)
diff = scheduler.now - datetime.fromtimestamp(loop.time(), tz=timezone.utc)
assert abs(diff) < timedelta(milliseconds=2) # NOTE: may take 1 ms in CI

@pytest.mark.skipif(CI, reason="Test is flaky in GitHub Actions")
def test_asyncio_schedule_now_units(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
scheduler = AsyncIOScheduler(loop)
diff = scheduler.now
yield from asyncio.sleep(0.1)
diff = scheduler.now - diff
assert timedelta(milliseconds=80) < diff < timedelta(milliseconds=180)

def test_asyncio_schedule_action(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()

async def go():
scheduler = AsyncIOScheduler(loop)
Expand All @@ -46,7 +46,7 @@ def action(scheduler, state):
loop.run_until_complete(go())

def test_asyncio_schedule_action_due(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()

async def go():
scheduler = AsyncIOScheduler(loop)
Expand All @@ -67,7 +67,7 @@ def action(scheduler, state):
loop.run_until_complete(go())

def test_asyncio_schedule_action_cancel(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()

async def go():
ran = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@
class TestAsyncIOThreadSafeScheduler(unittest.TestCase):
@pytest.mark.skipif(CI, reason="Flaky test in GitHub Actions")
def test_asyncio_threadsafe_schedule_now(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
scheduler = AsyncIOThreadSafeScheduler(loop)
diff = scheduler.now - datetime.fromtimestamp(loop.time(), tz=timezone.utc)
assert abs(diff) < timedelta(milliseconds=2)

@pytest.mark.skipif(CI, reason="Flaky test in GitHub Actions")
def test_asyncio_threadsafe_schedule_now_units(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
scheduler = AsyncIOThreadSafeScheduler(loop)
diff = scheduler.now
yield from asyncio.sleep(0.1)
diff = scheduler.now - diff
assert timedelta(milliseconds=80) < diff < timedelta(milliseconds=180)

def test_asyncio_threadsafe_schedule_action(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()

async def go():
scheduler = AsyncIOThreadSafeScheduler(loop)
Expand All @@ -50,7 +50,7 @@ def schedule():
loop.run_until_complete(go())

def test_asyncio_threadsafe_schedule_action_due(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()

async def go():
scheduler = AsyncIOThreadSafeScheduler(loop)
Expand All @@ -74,7 +74,7 @@ def schedule():
loop.run_until_complete(go())

def test_asyncio_threadsafe_schedule_action_cancel(self):
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()

async def go():
ran = False
Expand Down