Skip to content

Commit 4d7385e

Browse files
committed
Close async client within the stream event loop
1 parent f0015d6 commit 4d7385e

File tree

2 files changed

+42
-6
lines changed

2 files changed

+42
-6
lines changed

src/range_streams/async_utils.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing import TYPE_CHECKING, Callable, Coroutine, Iterator, Type
1010

1111
from aiostream import stream
12+
from aiostream.core import StreamEmpty
1213
from ranges import Range, RangeSet
1314

1415
MYPY = False # when using mypy will be overrided as True
@@ -34,15 +35,32 @@ def __init__(
3435
show_progress_bar: bool = True,
3536
timeout_s: float = 5.0,
3637
client=None,
38+
close_client: bool = False,
3739
**kwargs,
3840
):
3941
"""
4042
Any kwargs are passed through to the stream class constructor.
4143
4244
Args:
43-
callback : A function to be passed 3 values: the AsyncFetcher which is calling
44-
it, the awaited RangeStream, and its source URL (a ``httpx.URL``,
45-
which can be coerced to a string).
45+
stream_cls : The :class:`~range_streams.stream.RangeStream` class or a
46+
subclass (i.e. one of its codecs or a custom subclass)
47+
to instantiate for each of the URLs. Note: these classes
48+
also have a helper method
49+
:meth:`~range_streams.stream.RangeStream.make_async_fetcher`
50+
urls : The list of URLs to fetch until completion
51+
callback : A function to be passed 3 values: the AsyncFetcher which
52+
is calling it, the awaited RangeStream, and its source URL
53+
(a ``httpx.URL``, which can be coerced to a string).
54+
verbose : Whether to log to console
55+
show_progress_bar : Whether to show a tqdm progress bar (async-compatible)
56+
timeout_s : The timeout to set on the client (converted into
57+
``httpx.Timeout`` configuration on instantiation)
58+
client : The client to pass, if any, or one will be instantiated
59+
and closed on each usage (note: not each instantiation!)
60+
close_client : Whether to close the client upon completion (only if
61+
provided: if no client is provided, one will be created
62+
and closed by the standard `async with httpx.AsyncClient`
63+
contextmanager block).
4664
"""
4765
if urls == []:
4866
raise ValueError("The list of URLs to fetch cannot be empty")
@@ -59,6 +77,7 @@ def __init__(
5977
self.verbose = verbose
6078
self.show_progress_bar = show_progress_bar and not self.verbose
6179
self.client = client
80+
self.close_client_on_completion = close_client
6281
self.timeout = httpx.Timeout(timeout=timeout_s)
6382
self.completed = RangeSet()
6483
set_up_logging(quiet=not verbose)
@@ -71,14 +90,22 @@ def make_calls(self):
7190
urlset = (u for u in self.filtered_url_list) # single use URL generator
7291
if self.show_progress_bar:
7392
self.set_up_progress_bar()
74-
self.fetch_things(urls=urlset)
93+
try:
94+
self.fetch_things(urls=urlset)
95+
except StreamEmpty as exc:
96+
# Treat this like a StopIteration (was called despite completed URLs)
97+
if self.close_client_on_completion:
98+
asyncio.run(self.client.aclose())
99+
else:
100+
raise
101+
# Note: to avoid throwing exception, check `total_complete` before calling
75102
if self.show_progress_bar:
76103
self.pbar.close()
77104

78105
async def process_stream(self, range_stream: RangeStreamOrSubclass):
79106
"""
80107
Process an awaited RangeStream within an async fetch loop, calling the callback
81-
set on the `~range_streams.async_utils.AsyncFetcher.callback` attribute.
108+
set on the :attr:`~range_streams.async_utils.AsyncFetcher.callback` attribute.
82109
83110
Args:
84111
range_stream : The awaited RangeStream (or one of its subclasses)
@@ -94,8 +121,12 @@ async def process_stream(self, range_stream: RangeStreamOrSubclass):
94121
log.debug(f"Processed URL in async callback: {source_url}")
95122
if self.show_progress_bar:
96123
self.pbar.update()
97-
self.complete_row(row_index=i)
124+
if i not in self.completed:
125+
# Don't bother putting in if already been marked as complete in the callback
126+
self.complete_row(row_index=i)
98127
await resp.aclose()
128+
if self.total_complete == self.n and self.close_client_on_completion:
129+
await self.client.aclose()
99130

100131
@property
101132
def total_complete(self) -> int:
@@ -174,6 +205,9 @@ async def async_fetch_urlset(
174205
in a contextmanager block (i.e. close it immediately after use), otherwise use
175206
the one provided, not in a contextmanager block (i.e. leave it up to the user to
176207
close the client).
208+
209+
Args:
210+
urls : The URLs to fetch, as an exhaustible iterator (not a Sequence)
177211
"""
178212
await self.set_async_signal_handlers()
179213
if self.client is None:

src/range_streams/stream.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -898,6 +898,7 @@ def make_async_fetcher(
898898
show_progress_bar: bool = True,
899899
timeout_s: float = 5.0,
900900
client=None,
901+
close_client: bool = False,
901902
**kwargs,
902903
):
903904
return AsyncFetcher(
@@ -908,5 +909,6 @@ def make_async_fetcher(
908909
show_progress_bar=show_progress_bar,
909910
timeout_s=timeout_s,
910911
client=client,
912+
close_client=close_client,
911913
**kwargs, # Any other kwargs can be passed through to RangeStream subclass
912914
)

0 commit comments

Comments
 (0)