From 7aa24c3ac2cf55660a026f90638be08e6dec040a Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Thu, 13 Jun 2024 17:03:43 +0100 Subject: [PATCH 1/2] Remove shielding from cancellation process --- httpcore/.DS_Store | Bin 0 -> 6148 bytes httpcore/_async/http11.py | 40 ++++++++++++++++----------- httpcore/_sync/http11.py | 32 +++++++++++---------- tests/_async/test_connection_pool.py | 2 -- tests/_sync/test_connection_pool.py | 2 -- 5 files changed, 42 insertions(+), 34 deletions(-) create mode 100644 httpcore/.DS_Store diff --git a/httpcore/.DS_Store b/httpcore/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..c2989f81001cd2fcc058f19f26c3bd39c3832868 GIT binary patch literal 6148 zcmeHKJ<0+x5S{T;5Ns?ha|Le@L{H!W6h6U5R!|h|w(?va%{M>Ive;>%yn)F}Ci4>V zik%%1(Zyvq7nzF47;Y$c8~SGZ<~hi^MH zDnJFO02QDDRN&nTWQ841-hD97qXJam>lLu?LxCIC#5T}B9T>a?01gp$!`yobV6gzO zCbofyz%;19plY@l8g#@<=GDYDFzBM$d}!XR*`cW4j`NGBi`GDnRDcR}75Izg)aw5V z{-XcylDMJ*RN$u+(BX2uT;NGrTYHbQT3g^-xaB Response: f"to {self._origin}" ) - async with self._state_lock: + with self._state_thread_lock: + # We ensure that state changes at the start and end of a + # request/response cycle are thread-locked. if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): self._request_count += 1 self._state = HTTPConnectionState.ACTIVE @@ -137,9 +139,8 @@ async def handle_async_request(self, request: Request) -> Response: }, ) except BaseException as exc: - with AsyncShieldCancellation(): - async with Trace("response_closed", logger, request) as trace: - await self._response_closed() + if self._connection_should_close(): + await self._network_stream.aclose() raise exc # Sending the request... @@ -242,8 +243,12 @@ async def _receive_event( # mypy fails to narrow the type in the above if statement above return event # type: ignore[return-value] - async def _response_closed(self) -> None: - async with self._state_lock: + def _connection_should_close(self) -> bool: + # Once the response is complete we either need to move into + # an IDLE or CLOSED state. + with self._state_thread_lock: + # We ensure that state changes at the start and end of a + # request/response cycle are thread-locked. if ( self._h11_state.our_state is h11.DONE and self._h11_state.their_state is h11.DONE @@ -253,8 +258,10 @@ async def _response_closed(self) -> None: if self._keepalive_expiry is not None: now = time.monotonic() self._expire_at = now + self._keepalive_expiry - else: - await self.aclose() + return False + + self._state = HTTPConnectionState.CLOSED + return True # Once the connection is no longer required... @@ -344,15 +351,16 @@ async def __aiter__(self) -> AsyncIterator[bytes]: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) # before raising that exception. - with AsyncShieldCancellation(): - await self.aclose() + if self._connection._connection_should_close(): + await self._connection.aclose() raise exc async def aclose(self) -> None: - if not self._closed: - self._closed = True - async with Trace("response_closed", logger, self._request): - await self._connection._response_closed() + async with Trace("response_closed", logger, self._request, kwargs={}): + if not self._closed: + self._closed = True + if self._connection._connection_should_close(): + await self._connection.aclose() class AsyncHTTP11UpgradeStream(AsyncNetworkStream): diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index a74ff8e80..a4fc9d900 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -25,7 +25,7 @@ map_exceptions, ) from .._models import Origin, Request, Response -from .._synchronization import Lock, ShieldCancellation +from .._synchronization import ThreadLock from .._trace import Trace from .interfaces import ConnectionInterface @@ -62,7 +62,7 @@ def __init__( self._keepalive_expiry: Optional[float] = keepalive_expiry self._expire_at: Optional[float] = None self._state = HTTPConnectionState.NEW - self._state_lock = Lock() + self._state_lock = ThreadLock() # thread-lock for sync, no-op for async self._request_count = 0 self._h11_state = h11.Connection( our_role=h11.CLIENT, @@ -137,9 +137,8 @@ def handle_request(self, request: Request) -> Response: }, ) except BaseException as exc: - with ShieldCancellation(): - with Trace("response_closed", logger, request) as trace: - self._response_closed() + if self._connection_should_close(): + self._network_stream.close() raise exc # Sending the request... @@ -242,7 +241,9 @@ def _receive_event( # mypy fails to narrow the type in the above if statement above return event # type: ignore[return-value] - def _response_closed(self) -> None: + def _connection_should_close(self) -> bool: + # Once the response is complete we either need to move into + # an IDLE or CLOSED state. with self._state_lock: if ( self._h11_state.our_state is h11.DONE @@ -253,8 +254,10 @@ def _response_closed(self) -> None: if self._keepalive_expiry is not None: now = time.monotonic() self._expire_at = now + self._keepalive_expiry - else: - self.close() + return False + + self._state = HTTPConnectionState.CLOSED + return True # Once the connection is no longer required... @@ -344,15 +347,16 @@ def __iter__(self) -> Iterator[bytes]: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) # before raising that exception. - with ShieldCancellation(): - self.close() + if self._connection._connection_should_close(): + self._connection.close() raise exc def close(self) -> None: - if not self._closed: - self._closed = True - with Trace("response_closed", logger, self._request): - self._connection._response_closed() + with Trace("response_closed", logger, self._request, kwargs={}): + if not self._closed: + self._closed = True + if self._connection._connection_should_close(): + self._connection.close() class HTTP11UpgradeStream(NetworkStream): diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py index 2fc272049..27e47eb34 100644 --- a/tests/_async/test_connection_pool.py +++ b/tests/_async/test_connection_pool.py @@ -398,8 +398,6 @@ async def trace(name, kwargs): "http11.send_request_body.complete", "http11.receive_response_headers.started", "http11.receive_response_headers.failed", - "http11.response_closed.started", - "http11.response_closed.complete", ] diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py index ee303e5cf..c73a7c2f7 100644 --- a/tests/_sync/test_connection_pool.py +++ b/tests/_sync/test_connection_pool.py @@ -398,8 +398,6 @@ def trace(name, kwargs): "http11.send_request_body.complete", "http11.receive_response_headers.started", "http11.receive_response_headers.failed", - "http11.response_closed.started", - "http11.response_closed.complete", ] From f3569f53303c7daaccce5932c6c91a0f733447bd Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Thu, 13 Jun 2024 17:35:18 +0100 Subject: [PATCH 2/2] Linting --- httpcore/_async/http11.py | 4 +++- httpcore/_sync/http11.py | 12 +++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index 396bd402b..b836d8290 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -62,7 +62,9 @@ def __init__( self._keepalive_expiry: Optional[float] = keepalive_expiry self._expire_at: Optional[float] = None self._state = HTTPConnectionState.NEW - self._state_thread_lock = AsyncThreadLock() # thread-lock for sync, no-op for async + self._state_thread_lock = ( + AsyncThreadLock() + ) # thread-lock for sync, no-op for async self._request_count = 0 self._h11_state = h11.Connection( our_role=h11.CLIENT, diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index a4fc9d900..c27f0d324 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -62,7 +62,9 @@ def __init__( self._keepalive_expiry: Optional[float] = keepalive_expiry self._expire_at: Optional[float] = None self._state = HTTPConnectionState.NEW - self._state_lock = ThreadLock() # thread-lock for sync, no-op for async + self._state_thread_lock = ( + ThreadLock() + ) # thread-lock for sync, no-op for async self._request_count = 0 self._h11_state = h11.Connection( our_role=h11.CLIENT, @@ -76,7 +78,9 @@ def handle_request(self, request: Request) -> Response: f"to {self._origin}" ) - with self._state_lock: + with self._state_thread_lock: + # We ensure that state changes at the start and end of a + # request/response cycle are thread-locked. if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): self._request_count += 1 self._state = HTTPConnectionState.ACTIVE @@ -244,7 +248,9 @@ def _receive_event( def _connection_should_close(self) -> bool: # Once the response is complete we either need to move into # an IDLE or CLOSED state. - with self._state_lock: + with self._state_thread_lock: + # We ensure that state changes at the start and end of a + # request/response cycle are thread-locked. if ( self._h11_state.our_state is h11.DONE and self._h11_state.their_state is h11.DONE