From 4643d4797ae266161f2113d6809f9ffc90d2b2da Mon Sep 17 00:00:00 2001 From: Pablo Woolvett Date: Fri, 6 May 2022 14:49:21 -0400 Subject: [PATCH 1/4] test: add concurrent checks * group together fixtures for backend testing * Add per-backend setup to allow slow initialization. This is specially important as kafka subscribe can return while the consumer is not ready. See related test setup upstream: - https://github.com/aio-libs/aiokafka/blob/2c54e10c57760f779961a8c2f5df8ad609ef6983/tests/test_consumer.py#L433 - https://github.com/aio-libs/aiokafka/blob/2c54e10c57760f779961a8c2f5df8ad609ef6983/tests/_testutil.py#L376 - https://github.com/aio-libs/aiokafka/blob/2c54e10c57760f779961a8c2f5df8ad609ef6983/tests/_testutil.py#L364 fixes #42 --- tests/conftest.py | 59 ++++++++++++++++++++++++++++++++++++++++ tests/test_broadcast.py | 58 ++++++++++++--------------------------- tests/test_concurrent.py | 30 ++++++++++++++++++++ 3 files changed, 107 insertions(+), 40 deletions(-) create mode 100644 tests/conftest.py create mode 100644 tests/test_concurrent.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a297d7b --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,59 @@ +"""Check for #22""" + +import asyncio +import functools + +import pytest_asyncio + +from broadcaster import Broadcast +from broadcaster._backends.kafka import KafkaBackend + + +async def __has_topic_now(client, topic): + if await client.force_metadata_update(): + if topic in client.cluster.topics(): + print(f'Topic "{topic}" exists') + return True + return False + + +async def __wait_has_topic(client, topic, *, timeout_sec=5): + poll_time_sec = 1 / 10000 + from datetime import datetime + + pre = datetime.now() + while True: + if (datetime.now() - pre).total_seconds() >= timeout_sec: + raise ValueError(f'No topic "{topic}" exists') + if await __has_topic_now(client, topic): + return + await asyncio.sleep(poll_time_sec) + + +def kafka_backend_setup(kafka_backend): + """Block until consumer client contains the topic""" + subscribe_impl = kafka_backend.subscribe + + @functools.wraps(subscribe_impl) + async def subscribe(channel: str) -> None: + await subscribe_impl(channel) + await __wait_has_topic(kafka_backend._consumer._client, channel) + + kafka_backend.subscribe = subscribe + + +BROADCASTS_SETUP = { + KafkaBackend: kafka_backend_setup, +} + + +@pytest_asyncio.fixture(scope="function") +async def setup_broadcast(request): + url = request.param + async with Broadcast(url) as broadcast: + backend = broadcast._backend + for klass, setup in BROADCASTS_SETUP.items(): + if isinstance(backend, klass): + setup(backend) + break + yield broadcast diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 61e7295..0b0b3b2 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -1,45 +1,23 @@ -import pytest - -from broadcaster import Broadcast - - -@pytest.mark.asyncio -async def test_memory(): - async with Broadcast("memory://") as broadcast: - async with broadcast.subscribe("chatroom") as subscriber: - await broadcast.publish("chatroom", "hello") - event = await subscriber.get() - assert event.channel == "chatroom" - assert event.message == "hello" - - -@pytest.mark.asyncio -async def test_redis(): - async with Broadcast("redis://localhost:6379") as broadcast: - async with broadcast.subscribe("chatroom") as subscriber: - await broadcast.publish("chatroom", "hello") - event = await subscriber.get() - assert event.channel == "chatroom" - assert event.message == "hello" +from uuid import uuid4 +import pytest -@pytest.mark.asyncio -async def test_postgres(): - async with Broadcast( - "postgres://postgres:postgres@localhost:5432/broadcaster" - ) as broadcast: - async with broadcast.subscribe("chatroom") as subscriber: - await broadcast.publish("chatroom", "hello") - event = await subscriber.get() - assert event.channel == "chatroom" - assert event.message == "hello" +URLS = [ + ("memory://",), + ("redis://localhost:6379",), + ("postgres://postgres:postgres@localhost:5432/broadcaster",), + ("kafka://localhost:9092",), +] @pytest.mark.asyncio -async def test_kafka(): - async with Broadcast("kafka://localhost:9092") as broadcast: - async with broadcast.subscribe("chatroom") as subscriber: - await broadcast.publish("chatroom", "hello") - event = await subscriber.get() - assert event.channel == "chatroom" - assert event.message == "hello" +@pytest.mark.parametrize(["setup_broadcast"], URLS, indirect=True) +async def test_broadcast(setup_broadcast): + uid = uuid4() + channel = f"chatroom-{uid}" + msg = f"hello {uid}" + async with setup_broadcast.subscribe(channel) as subscriber: + await setup_broadcast.publish(channel, msg) + event = await subscriber.get() + assert event.channel == channel + assert event.message == msg diff --git a/tests/test_concurrent.py b/tests/test_concurrent.py new file mode 100644 index 0000000..639a47e --- /dev/null +++ b/tests/test_concurrent.py @@ -0,0 +1,30 @@ +"""Check for #22""" +import asyncio +from uuid import uuid4 + +import pytest + +MESSAGES = ["hello", "goodbye"] + +URLS = [ + ("memory://",), + ("redis://localhost:6379",), + ("postgres://postgres:postgres@localhost:5432/broadcaster",), + ("kafka://localhost:9092",), +] + + +@pytest.mark.asyncio +@pytest.mark.parametrize(["setup_broadcast"], URLS, indirect=True) +async def test_broadcast(setup_broadcast): + uid = uuid4() + channel = f"chatroom-{uid}" + msgs = [f"{msg} {uid}" for msg in MESSAGES] + async with setup_broadcast.subscribe(channel) as subscriber: + to_publish = [setup_broadcast.publish(channel, msg) for msg in msgs] + + await asyncio.gather(*to_publish) + for msg in msgs: + event = await subscriber.get() + assert event.channel == channel + assert event.message == msg From aaf59e81eca4a028d950fd6bc27c047154d89533 Mon Sep 17 00:00:00 2001 From: Pablo Woolvett Date: Fri, 6 May 2022 14:35:44 -0400 Subject: [PATCH 2/4] chore: update remnants, packages Signed-off-by: Pablo Woolvett --- .gitignore | 4 +++- requirements.txt | 4 ++-- scripts/check | 2 +- scripts/lint | 4 ++-- setup.cfg | 3 ++- 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 7b5d431..8247f13 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,7 @@ test.db .coverage .pytest_cache/ .mypy_cache/ -starlette.egg-info/ +broadcaster.egg-info/ venv/ +build/ +dist/ diff --git a/requirements.txt b/requirements.txt index 811910c..f3913d7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,14 +11,14 @@ wheel # Tests & Linting autoflake -black==20.8b1 +black==22.3.0 coverage==5.3 flake8 flake8-bugbear flake8-pie==0.5.* isort==5.* mypy -pytest==5.* +pytest==7.* pytest-asyncio pytest-trio trio diff --git a/scripts/check b/scripts/check index 77acf65..720a644 100755 --- a/scripts/check +++ b/scripts/check @@ -11,4 +11,4 @@ set -x ${PREFIX}black --check --diff --target-version=py37 $SOURCE_FILES ${PREFIX}flake8 $SOURCE_FILES ${PREFIX}mypy $SOURCE_FILES -${PREFIX}isort --check --diff --project=httpx $SOURCE_FILES +${PREFIX}isort --check --diff --project=broadcaster $SOURCE_FILES diff --git a/scripts/lint b/scripts/lint index 81851c6..9470161 100755 --- a/scripts/lint +++ b/scripts/lint @@ -4,10 +4,10 @@ export PREFIX="" if [ -d 'venv' ] ; then export PREFIX="venv/bin/" fi -export SOURCE_FILES="httpx tests" +export SOURCE_FILES="broadcaster tests" set -x ${PREFIX}autoflake --in-place --recursive $SOURCE_FILES -${PREFIX}isort --project=httpx $SOURCE_FILES +${PREFIX}isort --project=broadcaster $SOURCE_FILES ${PREFIX}black --target-version=py37 $SOURCE_FILES diff --git a/setup.cfg b/setup.cfg index c860d81..e88ec20 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,9 +16,10 @@ combine_as_imports = True [tool:pytest] addopts = -rxXs +asyncio_mode = strict markers = copied_from(source, changes=None): mark test as copied from somewhere else, along with a description of changes made to accodomate e.g. our test setup [coverage:run] omit = venv/* -include = httpx/*, tests/* +include = broadcaster/*, tests/* From 9255c29527cf7efcc53e391f2fb50457f40a40e8 Mon Sep 17 00:00:00 2001 From: Pablo Woolvett Date: Fri, 6 May 2022 14:46:54 -0400 Subject: [PATCH 3/4] fix(backend/kafka): consumer unsubscribe not awaitable --- broadcaster/_backends/kafka.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/broadcaster/_backends/kafka.py b/broadcaster/_backends/kafka.py index a3df086..18b88d2 100644 --- a/broadcaster/_backends/kafka.py +++ b/broadcaster/_backends/kafka.py @@ -1,4 +1,3 @@ -import asyncio import typing from urllib.parse import urlparse @@ -14,9 +13,8 @@ def __init__(self, url: str): self._consumer_channels: typing.Set = set() async def connect(self) -> None: - loop = asyncio.get_event_loop() - self._producer = AIOKafkaProducer(loop=loop, bootstrap_servers=self._servers) - self._consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self._servers) + self._producer = AIOKafkaProducer(bootstrap_servers=self._servers) + self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers) await self._producer.start() await self._consumer.start() @@ -29,7 +27,7 @@ async def subscribe(self, channel: str) -> None: self._consumer.subscribe(topics=self._consumer_channels) async def unsubscribe(self, channel: str) -> None: - await self._consumer.unsubscribe() + self._consumer.unsubscribe() async def publish(self, channel: str, message: typing.Any) -> None: await self._producer.send_and_wait(channel, message.encode("utf8")) From 54e70b255ffc954087af8b7ea586bdbce4338ce9 Mon Sep 17 00:00:00 2001 From: Pablo Woolvett Date: Fri, 6 May 2022 14:52:36 -0400 Subject: [PATCH 4/4] fix(backend/postgres): allow concurrent pubs This fix adds an asyncio.Lock to avoid `asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress` fixes #22 --- broadcaster/_backends/postgres.py | 10 +++++++--- tests/test_concurrent.py | 32 +++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/broadcaster/_backends/postgres.py b/broadcaster/_backends/postgres.py index 47ef4f6..85bc588 100644 --- a/broadcaster/_backends/postgres.py +++ b/broadcaster/_backends/postgres.py @@ -13,19 +13,23 @@ def __init__(self, url: str): async def connect(self) -> None: self._conn = await asyncpg.connect(self._url) + self._lock = asyncio.Lock() self._listen_queue: asyncio.Queue = asyncio.Queue() async def disconnect(self) -> None: await self._conn.close() async def subscribe(self, channel: str) -> None: - await self._conn.add_listener(channel, self._listener) + async with self._lock: + await self._conn.add_listener(channel, self._listener) async def unsubscribe(self, channel: str) -> None: - await self._conn.remove_listener(channel, self._listener) + async with self._lock: + await self._conn.remove_listener(channel, self._listener) async def publish(self, channel: str, message: str) -> None: - await self._conn.execute("SELECT pg_notify($1, $2);", channel, message) + async with self._lock: + await self._conn.execute("SELECT pg_notify($1, $2);", channel, message) def _listener(self, *args: Any) -> None: connection, pid, channel, payload = args diff --git a/tests/test_concurrent.py b/tests/test_concurrent.py index 639a47e..0082901 100644 --- a/tests/test_concurrent.py +++ b/tests/test_concurrent.py @@ -28,3 +28,35 @@ async def test_broadcast(setup_broadcast): event = await subscriber.get() assert event.channel == channel assert event.message == msg + + +@pytest.mark.asyncio +@pytest.mark.parametrize(["setup_broadcast"], URLS, indirect=True) +async def test_sub(setup_broadcast): + uid = uuid4() + channel1 = f"chatroom-{uid}1" + channel2 = f"chatroom-{uid}2" + + to_sub = [ + setup_broadcast._backend.subscribe(channel1), + setup_broadcast._backend.subscribe(channel2), + ] + await asyncio.gather(*to_sub) + + +@pytest.mark.asyncio +@pytest.mark.parametrize(["setup_broadcast"], URLS, indirect=True) +async def test_unsub(setup_broadcast): + uid = uuid4() + channel1 = f"chatroom-{uid}1" + channel2 = f"chatroom-{uid}2" + + await setup_broadcast._backend.subscribe(channel1) + await setup_broadcast._backend.subscribe(channel2) + + to_unsub = [ + setup_broadcast._backend.unsubscribe(channel1), + setup_broadcast._backend.unsubscribe(channel2), + ] + + await asyncio.gather(*to_unsub)