From f467a330fb981994ecbefb9ca94cd4c1d096bcf3 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 17:39:20 +0200 Subject: [PATCH 01/13] :sparkles: Implement JsonUtil for switching between json and orjson Signed-off-by: ff137 --- nats/json_util.py | 66 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 nats/json_util.py diff --git a/nats/json_util.py b/nats/json_util.py new file mode 100644 index 00000000..7a5e334f --- /dev/null +++ b/nats/json_util.py @@ -0,0 +1,66 @@ +"""A module providing a utility class for handling JSON-related operations.""" + +import json +from typing import Any + +try: + import orjson +except ImportError: + orjson = None + + +class JsonUtil: + """A utility class for handling JSON-related operations. + This class provides static methods for formatting JSON strings, and + for converting between Python objects and JSON strings/files. It uses + the `orjson` library where possible for its speed advantages, but reverts + to the standard `json` library where `orjson` does not support the required + functionality. + """ + + @staticmethod + def dumps(obj, *args, **kwargs) -> str: + """Convert a Python object into a json string. + Args: + obj: The data to be converted + *args: Extra arguments to pass to the orjson.dumps() function + **kwargs: Extra keyword arguments to pass to the orjson.dumps() function + Returns: + The json string representation of obj + """ + + if orjson is None: + return json.dumps(obj, *args, **kwargs) + else: + return orjson.dumps(obj, *args, **kwargs).decode("utf-8") + + @staticmethod + def dump_bytes(obj, *args, **kwargs) -> bytes: + """Convert a Python object into a bytes string. + Args: + obj: The data to be converted + *args: Extra arguments to pass to the orjson.dumps() function + **kwargs: Extra keyword arguments to pass to the orjson.dumps() function + Returns: + The json string representation of obj + """ + + if orjson is None: + return json.dumps(obj, *args, **kwargs).encode("utf-8") + else: + return orjson.dumps(obj, *args, **kwargs) + + @staticmethod + def loads(s: str, *args, **kwargs) -> Any: + """Parse a JSON string into a Python object. + Args: + s: The JSON string to be parsed + *args: Extra arguments to pass to the orjson.loads() function + **kwargs: Extra keyword arguments to pass to the orjson.loads() function + Returns: + The Python representation of s + """ + if orjson is None: + return json.loads(s, *args, **kwargs) + else: + return orjson.loads(s, *args, **kwargs) From de9bd782858d93f2b593e6bcd3310fd12237abab Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 17:48:09 +0200 Subject: [PATCH 02/13] :art: Replace `import json` with `from nats.json_util import JsonUtil as json` Signed-off-by: ff137 --- nats/aio/client.py | 3 ++- nats/aio/msg.py | 2 +- nats/aio/subscription.py | 10 ++-------- nats/js/client.py | 12 ++---------- nats/js/manager.py | 2 +- nats/js/object_store.py | 2 +- nats/micro/service.py | 2 +- nats/protocol/parser.py | 2 +- tests/test_client.py | 2 +- tests/test_compatibility.py | 2 +- tests/test_js.py | 2 +- 11 files changed, 14 insertions(+), 27 deletions(-) diff --git a/nats/aio/client.py b/nats/aio/client.py index 25253bc6..96badb40 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -17,7 +17,6 @@ import asyncio import base64 import ipaddress -import json import logging import re import ssl @@ -33,6 +32,8 @@ from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union from urllib.parse import ParseResult, urlparse +from nats.json_util import JsonUtil as json + try: from fast_mail_parser import parse_email except ImportError: diff --git a/nats/aio/msg.py b/nats/aio/msg.py index 5724706a..980ef528 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -14,11 +14,11 @@ from __future__ import annotations import datetime -import json from dataclasses import dataclass from typing import TYPE_CHECKING, Dict, List, Optional, Union from nats.errors import Error, MsgAlreadyAckdError, NotJSMessageError +from nats.json_util import JsonUtil as json if TYPE_CHECKING: from nats import NATS diff --git a/nats/aio/subscription.py b/nats/aio/subscription.py index 19c315b1..c2984894 100644 --- a/nats/aio/subscription.py +++ b/nats/aio/subscription.py @@ -15,17 +15,11 @@ from __future__ import annotations import asyncio -from typing import ( - TYPE_CHECKING, - AsyncIterator, - Awaitable, - Callable, - List, - Optional, -) +from typing import TYPE_CHECKING, AsyncIterator, Awaitable, Callable, List, Optional from uuid import uuid4 from nats import errors + # Default Pending Limits of Subscriptions from nats.aio.msg import Msg diff --git a/nats/js/client.py b/nats/js/client.py index d26413c0..34d32b08 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -15,19 +15,10 @@ from __future__ import annotations import asyncio -import json import time from email.parser import BytesParser from secrets import token_hex -from typing import ( - TYPE_CHECKING, - Any, - Awaitable, - Callable, - Dict, - List, - Optional, -) +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional import nats.errors import nats.js.errors @@ -50,6 +41,7 @@ VALID_BUCKET_RE, ObjectStore, ) +from nats.json_util import JsonUtil as json if TYPE_CHECKING: from nats import NATS diff --git a/nats/js/manager.py b/nats/js/manager.py index bfd5937f..c3778941 100644 --- a/nats/js/manager.py +++ b/nats/js/manager.py @@ -15,13 +15,13 @@ from __future__ import annotations import base64 -import json from email.parser import BytesParser from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional from nats.errors import NoRespondersError from nats.js import api from nats.js.errors import APIError, NotFoundError, ServiceUnavailableError +from nats.json_util import JsonUtil as json if TYPE_CHECKING: from nats import NATS diff --git a/nats/js/object_store.py b/nats/js/object_store.py index 70ce3d3b..f52796b8 100644 --- a/nats/js/object_store.py +++ b/nats/js/object_store.py @@ -15,7 +15,6 @@ import asyncio import base64 import io -import json import re from dataclasses import dataclass from datetime import datetime, timezone @@ -34,6 +33,7 @@ ObjectNotFoundError, ) from nats.js.kv import MSG_ROLLUP_SUBJECT +from nats.json_util import JsonUtil as json VALID_BUCKET_RE = re.compile(r"^[a-zA-Z0-9_-]+$") VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$") diff --git a/nats/micro/service.py b/nats/micro/service.py index 159530a3..9aa46503 100644 --- a/nats/micro/service.py +++ b/nats/micro/service.py @@ -1,6 +1,5 @@ from __future__ import annotations -import json import re import time from asyncio import Event @@ -21,6 +20,7 @@ from nats.aio.client import Client from nats.aio.msg import Msg from nats.aio.subscription import Subscription +from nats.json_util import JsonUtil as json from .request import Handler, Request, ServiceError diff --git a/nats/protocol/parser.py b/nats/protocol/parser.py index 6b8c7255..8f45650b 100644 --- a/nats/protocol/parser.py +++ b/nats/protocol/parser.py @@ -17,11 +17,11 @@ from __future__ import annotations -import json import re from typing import Any, Dict from nats.errors import ProtocolError +from nats.json_util import JsonUtil as json MSG_RE = re.compile( b"\\AMSG\\s+([^\\s]+)\\s+([^\\s]+)\\s+(([^\\s]+)[^\\S\r\n]+)?(\\d+)\r\n" diff --git a/tests/test_client.py b/tests/test_client.py index e16ee09a..60272987 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,6 +1,5 @@ import asyncio import http.client -import json import os import ssl import time @@ -12,6 +11,7 @@ import nats.errors import pytest from nats.aio.client import Client as NATS, ServerVersion, __version__ +from nats.json_util import JsonUtil as json from tests.utils import ( ClusteringDiscoveryAuthTestCase, ClusteringTestCase, diff --git a/tests/test_compatibility.py b/tests/test_compatibility.py index 838c3a8a..98bca630 100644 --- a/tests/test_compatibility.py +++ b/tests/test_compatibility.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import json import os from dataclasses import dataclass, field from typing import Any, Dict, List, Optional @@ -9,6 +8,7 @@ import nats from nats.aio.subscription import Subscription +from nats.json_util import JsonUtil as json from nats.micro.request import ServiceError from nats.micro.service import ( EndpointConfig, diff --git a/tests/test_js.py b/tests/test_js.py index 56c1cb21..91af916d 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -2,7 +2,6 @@ import base64 import datetime import io -import json import random import re import string @@ -20,6 +19,7 @@ from nats.aio.msg import Msg from nats.errors import * from nats.js.errors import * +from nats.json_util import JsonUtil as json from tests.utils import * try: From ef036d451024a79da4f019f423c94d722e7e4892 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 17:51:20 +0200 Subject: [PATCH 03/13] :sparkles: Handle sort_keys option in JsonUtil Signed-off-by: ff137 --- nats/json_util.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/nats/json_util.py b/nats/json_util.py index 7a5e334f..d98e746b 100644 --- a/nats/json_util.py +++ b/nats/json_util.py @@ -18,20 +18,35 @@ class JsonUtil: functionality. """ + @staticmethod + def _handle_sort_keys(kwargs): + """Internal helper to handle sort_keys parameter for orjson compatibility. + Args: + kwargs: The keyword arguments dictionary to modify + Returns: + Modified kwargs dictionary + """ + if kwargs.pop("sort_keys", False): + option = kwargs.get("option", 0) | orjson.OPT_SORT_KEYS + kwargs["option"] = option + return kwargs + @staticmethod def dumps(obj, *args, **kwargs) -> str: """Convert a Python object into a json string. Args: obj: The data to be converted - *args: Extra arguments to pass to the orjson.dumps() function - **kwargs: Extra keyword arguments to pass to the orjson.dumps() function + *args: Extra arguments to pass to the dumps() function + **kwargs: Extra keyword arguments to pass to the dumps() function. + Special handling for 'sort_keys' which is translated to + orjson.OPT_SORT_KEYS when using orjson. Returns: The json string representation of obj """ - if orjson is None: return json.dumps(obj, *args, **kwargs) else: + kwargs = JsonUtil._handle_sort_keys(kwargs) return orjson.dumps(obj, *args, **kwargs).decode("utf-8") @staticmethod @@ -39,15 +54,17 @@ def dump_bytes(obj, *args, **kwargs) -> bytes: """Convert a Python object into a bytes string. Args: obj: The data to be converted - *args: Extra arguments to pass to the orjson.dumps() function - **kwargs: Extra keyword arguments to pass to the orjson.dumps() function + *args: Extra arguments to pass to the dumps() function + **kwargs: Extra keyword arguments to pass to the dumps() function. + Special handling for 'sort_keys' which is translated to + orjson.OPT_SORT_KEYS when using orjson. Returns: - The json string representation of obj + The json string representation of obj as bytes """ - if orjson is None: return json.dumps(obj, *args, **kwargs).encode("utf-8") else: + kwargs = JsonUtil._handle_sort_keys(kwargs) return orjson.dumps(obj, *args, **kwargs) @staticmethod From 9056a82bb7d362e9af83573d0e5812d4c05809dd Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 17:55:14 +0200 Subject: [PATCH 04/13] :white_check_mark: Handle difference in expected dumped string from orjson Signed-off-by: ff137 --- tests/test_client.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/test_client.py b/tests/test_client.py index 60272987..9b69087c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -37,6 +37,14 @@ def test_default_connect_command(self): nc.options["no_echo"] = False got = nc._connect_command() expected = f'CONNECT {{"echo": true, "lang": "python3", "pedantic": false, "protocol": 1, "verbose": false, "version": "{__version__}"}}\r\n' + + try: + import orjson + # If using orjson, expected string is without spaces + expected = expected.replace(" ", "") + except ImportError: + pass + self.assertEqual(expected.encode(), got) def test_default_connect_command_with_name(self): @@ -48,6 +56,14 @@ def test_default_connect_command_with_name(self): nc.options["no_echo"] = False got = nc._connect_command() expected = f'CONNECT {{"echo": true, "lang": "python3", "name": "secret", "pedantic": false, "protocol": 1, "verbose": false, "version": "{__version__}"}}\r\n' + + try: + import orjson + # If using orjson, expected string is without spaces + expected = expected.replace(" ", "") + except ImportError: + pass + self.assertEqual(expected.encode(), got) def test_semver_parsing(self): From a561f5c0ae87da03df58a344752c3df73ef31ff2 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:04:07 +0200 Subject: [PATCH 05/13] :zap: Replace .dumps.encode calls with .dump_bytes Signed-off-by: ff137 --- nats/aio/client.py | 4 ++-- nats/aio/msg.py | 2 +- nats/js/client.py | 6 ++--- nats/js/manager.py | 46 ++++++++++++++++++------------------- nats/js/object_store.py | 6 ++--- nats/micro/service.py | 6 ++--- tests/test_micro_service.py | 4 ++-- 7 files changed, 37 insertions(+), 37 deletions(-) diff --git a/nats/aio/client.py b/nats/aio/client.py index 96badb40..a6f09b4a 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -1622,8 +1622,8 @@ def _connect_command(self) -> bytes: if self.options["no_echo"] is not None: options["echo"] = not self.options["no_echo"] - connect_opts = json.dumps(options, sort_keys=True) - return b"".join([CONNECT_OP + _SPC_ + connect_opts.encode() + _CRLF_]) + connect_opts = json.dump_bytes(options, sort_keys=True) + return b"".join([CONNECT_OP + _SPC_ + connect_opts + _CRLF_]) async def _process_ping(self) -> None: """ diff --git a/nats/aio/msg.py b/nats/aio/msg.py index 980ef528..835b6281 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -132,7 +132,7 @@ async def nak(self, delay: Union[int, float, None] = None) -> None: if delay: json_args["delay"] = int(delay * 10**9) # from seconds to ns if json_args: - payload += b" " + json.dumps(json_args).encode() + payload += b" " + json.dump_bytes(json_args) await self._client.publish(self.reply, payload) self._ackd = True diff --git a/nats/js/client.py b/nats/js/client.py index 34d32b08..51fec6eb 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -1127,7 +1127,7 @@ async def _fetch_one( await self._nc.publish( self._nms, - json.dumps(next_req).encode(), + json.dump_bytes(next_req), self._deliver, ) @@ -1212,7 +1212,7 @@ async def _fetch_n( next_req["no_wait"] = True await self._nc.publish( self._nms, - json.dumps(next_req).encode(), + json.dump_bytes(next_req), self._deliver, ) await asyncio.sleep(0) @@ -1278,7 +1278,7 @@ async def _fetch_n( await self._nc.publish( self._nms, - json.dumps(next_req).encode(), + json.dump_bytes(next_req), self._deliver, ) await asyncio.sleep(0) diff --git a/nats/js/manager.py b/nats/js/manager.py index c3778941..9278f1a9 100644 --- a/nats/js/manager.py +++ b/nats/js/manager.py @@ -60,9 +60,9 @@ async def find_stream_name_by_subject(self, subject: str) -> str: """ req_sub = f"{self._prefix}.STREAM.NAMES" - req_data = json.dumps({"subject": subject}) + req_data = json.dump_bytes({"subject": subject}) info = await self._api_request( - req_sub, req_data.encode(), timeout=self._timeout + req_sub, req_data, timeout=self._timeout ) if not info["streams"]: raise NotFoundError @@ -76,12 +76,12 @@ async def stream_info( """ Get the latest StreamInfo by stream name. """ - req_data = "" + req_data = b"" if subjects_filter: - req_data = json.dumps({"subjects_filter": subjects_filter}) + req_data = json.dump_bytes({"subjects_filter": subjects_filter}) resp = await self._api_request( f"{self._prefix}.STREAM.INFO.{name}", - req_data.encode(), + req_data, timeout=self._timeout, ) return api.StreamInfo.from_response(resp) @@ -114,10 +114,10 @@ async def add_stream( "path separators (forward or backward slash), or non-printable characters." ) - data = json.dumps(config.as_dict()) + data = json.dump_bytes(config.as_dict()) resp = await self._api_request( f"{self._prefix}.STREAM.CREATE.{stream_name}", - data.encode(), + data, timeout=self._timeout, ) return api.StreamInfo.from_response(resp) @@ -136,10 +136,10 @@ async def update_stream( if config.name is None: raise ValueError("nats: stream name is required") - data = json.dumps(config.as_dict()) + data = json.dump_bytes(config.as_dict()) resp = await self._api_request( f"{self._prefix}.STREAM.UPDATE.{config.name}", - data.encode(), + data, timeout=self._timeout, ) return api.StreamInfo.from_response(resp) @@ -171,10 +171,10 @@ async def purge_stream( if keep: stream_req["keep"] = keep - req = json.dumps(stream_req) + req = json.dump_bytes(stream_req) resp = await self._api_request( f"{self._prefix}.STREAM.PURGE.{name}", - req.encode(), + req, timeout=self._timeout ) return resp["success"] @@ -198,9 +198,9 @@ async def streams_info(self, offset=0) -> List[api.StreamInfo]: """ resp = await self._api_request( f"{self._prefix}.STREAM.LIST", - json.dumps({ + json.dump_bytes({ "offset": offset - }).encode(), + }), timeout=self._timeout, ) streams = [] @@ -216,9 +216,9 @@ async def streams_info_iterator(self, """ resp = await self._api_request( f"{self._prefix}.STREAM.LIST", - json.dumps({ + json.dump_bytes({ "offset": offset - }).encode(), + }), timeout=self._timeout, ) @@ -240,7 +240,7 @@ async def add_consumer( config = config.evolve(**params) durable_name = config.durable_name req = {"stream_name": stream, "config": config.as_dict()} - req_data = json.dumps(req).encode() + req_data = json.dump_bytes(req) resp = None subject = "" @@ -283,9 +283,9 @@ async def consumers_info( """ resp = await self._api_request( f"{self._prefix}.CONSUMER.LIST.{stream}", - b"" if offset is None else json.dumps({ + b"" if offset is None else json.dump_bytes({ "offset": offset - }).encode(), + }), timeout=self._timeout, ) consumers = [] @@ -318,19 +318,19 @@ async def get_msg( req["last_by_subj"] = None req.pop("last_by_subj", None) req["next_by_subj"] = subject - data = json.dumps(req) + data = json.dump_bytes(req) if direct: # $JS.API.DIRECT.GET.KV_{stream_name}.$KV.TEST.{key} if subject and (seq is None): # last_by_subject type request requires no payload. - data = "" + data = b"" req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}.{subject}" else: req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}" resp = await self._nc.request( - req_subject, data.encode(), timeout=self._timeout + req_subject, data, timeout=self._timeout ) raw_msg = JetStreamManager._lift_msg_to_raw_msg(resp) return raw_msg @@ -389,8 +389,8 @@ async def delete_msg(self, stream_name: str, seq: int) -> bool: """ req_subject = f"{self._prefix}.STREAM.MSG.DELETE.{stream_name}" req = {"seq": seq} - data = json.dumps(req) - resp = await self._api_request(req_subject, data.encode()) + data = json.dump_bytes(req) + resp = await self._api_request(req_subject, data) return resp["success"] async def get_last_msg( diff --git a/nats/js/object_store.py b/nats/js/object_store.py index f52796b8..8610e607 100644 --- a/nats/js/object_store.py +++ b/nats/js/object_store.py @@ -343,7 +343,7 @@ async def put( try: await self._js.publish( meta_subj, - json.dumps(info.as_dict()).encode(), + json.dump_bytes(info.as_dict()), headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, ) except Exception as err: @@ -412,7 +412,7 @@ async def update_meta( try: await self._js.publish( meta_subj, - json.dumps(info.as_dict()).encode(), + json.dump_bytes(info.as_dict()), headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, ) except Exception as err: @@ -538,7 +538,7 @@ async def delete(self, name: str) -> ObjectResult: try: await self._js.publish( meta_subj, - json.dumps(info.as_dict()).encode(), + json.dump_bytes(info.as_dict()), headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, ) finally: diff --git a/nats/micro/service.py b/nats/micro/service.py index 9aa46503..37343537 100644 --- a/nats/micro/service.py +++ b/nats/micro/service.py @@ -859,18 +859,18 @@ async def _handle_ping_request(self, msg: Msg) -> None: metadata=self._metadata, ).to_dict() - await msg.respond(data=json.dumps(ping).encode()) + await msg.respond(data=json.dump_bytes(ping)) async def _handle_info_request(self, msg: Msg) -> None: """Handle an info message.""" info = self.info().to_dict() - await msg.respond(data=json.dumps(info).encode()) + await msg.respond(data=json.dump_bytes(info)) async def _handle_stats_request(self, msg: Msg) -> None: """Handle a stats message.""" stats = self.stats().to_dict() - await msg.respond(data=json.dumps(stats).encode()) + await msg.respond(data=json.dump_bytes(stats)) def control_subject( diff --git a/tests/test_micro_service.py b/tests/test_micro_service.py index 9fa47fc7..b9514b09 100644 --- a/tests/test_micro_service.py +++ b/tests/test_micro_service.py @@ -79,10 +79,10 @@ async def add_handler(request: Request): for _ in range(50): await nc.request( "svc.add", - json.dumps({ + json.dump_bytes({ "x": 22, "y": 11 - }).encode("utf-8") + }) ) for svc in svcs: From 3c92a2e2a3d6a23cf912ce5c1fb1c8a5b637c183 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:08:22 +0200 Subject: [PATCH 06/13] :sparkles: Add orjson as optional dependency Signed-off-by: ff137 --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index a3955ca1..0253b7a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ classifiers = [ nkeys = ['nkeys'] aiohttp = ['aiohttp'] fast_parse = ['fast-mail-parser'] +orjson = ['orjson'] [tool.setuptools] zip-safe = true From 9b74cf5478604c931e721b6bb32f4d491f3c81f1 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:08:53 +0200 Subject: [PATCH 07/13] :bug: Remove .encode() Signed-off-by: ff137 --- nats/js/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats/js/manager.py b/nats/js/manager.py index 9278f1a9..a87b7ec1 100644 --- a/nats/js/manager.py +++ b/nats/js/manager.py @@ -338,7 +338,7 @@ async def get_msg( # Non Direct form req_subject = f"{self._prefix}.STREAM.MSG.GET.{stream_name}" resp_data = await self._api_request( - req_subject, data.encode(), timeout=self._timeout + req_subject, data, timeout=self._timeout ) raw_msg = api.RawStreamMsg.from_response(resp_data["message"]) From 44f6b2107f50607632a7f408efe7a1b5209efaf6 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:15:06 +0200 Subject: [PATCH 08/13] :arrow_up: Run format check with py3.12 Signed-off-by: ff137 --- .github/workflows/check.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 76224e8e..01a12c54 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -17,7 +17,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.12" - name: Install dependencies run: | From 5ff47603fbbb2838e1d9e62a96ecf6e466918dc8 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:19:20 +0200 Subject: [PATCH 09/13] :art: yapf format Signed-off-by: ff137 --- nats/js/manager.py | 16 ++++------------ tests/test_client.py | 2 +- tests/test_micro_service.py | 8 +------- 3 files changed, 6 insertions(+), 20 deletions(-) diff --git a/nats/js/manager.py b/nats/js/manager.py index a87b7ec1..f385f143 100644 --- a/nats/js/manager.py +++ b/nats/js/manager.py @@ -173,9 +173,7 @@ async def purge_stream( req = json.dump_bytes(stream_req) resp = await self._api_request( - f"{self._prefix}.STREAM.PURGE.{name}", - req, - timeout=self._timeout + f"{self._prefix}.STREAM.PURGE.{name}", req, timeout=self._timeout ) return resp["success"] @@ -198,9 +196,7 @@ async def streams_info(self, offset=0) -> List[api.StreamInfo]: """ resp = await self._api_request( f"{self._prefix}.STREAM.LIST", - json.dump_bytes({ - "offset": offset - }), + json.dump_bytes({"offset": offset}), timeout=self._timeout, ) streams = [] @@ -216,9 +212,7 @@ async def streams_info_iterator(self, """ resp = await self._api_request( f"{self._prefix}.STREAM.LIST", - json.dump_bytes({ - "offset": offset - }), + json.dump_bytes({"offset": offset}), timeout=self._timeout, ) @@ -283,9 +277,7 @@ async def consumers_info( """ resp = await self._api_request( f"{self._prefix}.CONSUMER.LIST.{stream}", - b"" if offset is None else json.dump_bytes({ - "offset": offset - }), + b"" if offset is None else json.dump_bytes({"offset": offset}), timeout=self._timeout, ) consumers = [] diff --git a/tests/test_client.py b/tests/test_client.py index 9b69087c..787f2e9e 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -37,7 +37,7 @@ def test_default_connect_command(self): nc.options["no_echo"] = False got = nc._connect_command() expected = f'CONNECT {{"echo": true, "lang": "python3", "pedantic": false, "protocol": 1, "verbose": false, "version": "{__version__}"}}\r\n' - + try: import orjson # If using orjson, expected string is without spaces diff --git a/tests/test_micro_service.py b/tests/test_micro_service.py index b9514b09..4b936c5c 100644 --- a/tests/test_micro_service.py +++ b/tests/test_micro_service.py @@ -77,13 +77,7 @@ async def add_handler(request: Request): svcs.append(svc) for _ in range(50): - await nc.request( - "svc.add", - json.dump_bytes({ - "x": 22, - "y": 11 - }) - ) + await nc.request("svc.add", json.dump_bytes({"x": 22, "y": 11})) for svc in svcs: info = svc.info() From aa05da8db648ae9a0df6bf6fe5de25ac0fc30e8a Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:22:48 +0200 Subject: [PATCH 10/13] :art: Update docstrings Signed-off-by: ff137 --- nats/json_util.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/nats/json_util.py b/nats/json_util.py index d98e746b..516ef36b 100644 --- a/nats/json_util.py +++ b/nats/json_util.py @@ -10,12 +10,20 @@ class JsonUtil: - """A utility class for handling JSON-related operations. - This class provides static methods for formatting JSON strings, and - for converting between Python objects and JSON strings/files. It uses - the `orjson` library where possible for its speed advantages, but reverts - to the standard `json` library where `orjson` does not support the required - functionality. + """A utility class for handling JSON serialization operations. + This class provides static methods for converting between Python objects and JSON + strings/bytes. It uses the `orjson` library when available for its performance + advantages, falling back to the standard `json` library when `orjson` is not + installed. + + The class handles compatibility between the two libraries, particularly for options + like 'sort_keys' which have different implementations in each library. All methods + maintain consistent behavior regardless of which JSON library is being used. + + Methods: + dumps(obj, *args, **kwargs) -> str: Converts object to JSON string + dump_bytes(obj, *args, **kwargs) -> bytes: Converts object to JSON bytes + loads(s, *args, **kwargs) -> Any: Parses JSON string into Python object """ @staticmethod @@ -24,7 +32,7 @@ def _handle_sort_keys(kwargs): Args: kwargs: The keyword arguments dictionary to modify Returns: - Modified kwargs dictionary + Modified kwargs dictionary with orjson-compatible options """ if kwargs.pop("sort_keys", False): option = kwargs.get("option", 0) | orjson.OPT_SORT_KEYS @@ -33,7 +41,7 @@ def _handle_sort_keys(kwargs): @staticmethod def dumps(obj, *args, **kwargs) -> str: - """Convert a Python object into a json string. + """Convert a Python object into a JSON string. Args: obj: The data to be converted *args: Extra arguments to pass to the dumps() function @@ -41,7 +49,7 @@ def dumps(obj, *args, **kwargs) -> str: Special handling for 'sort_keys' which is translated to orjson.OPT_SORT_KEYS when using orjson. Returns: - The json string representation of obj + str: A JSON string representation of obj """ if orjson is None: return json.dumps(obj, *args, **kwargs) @@ -51,7 +59,7 @@ def dumps(obj, *args, **kwargs) -> str: @staticmethod def dump_bytes(obj, *args, **kwargs) -> bytes: - """Convert a Python object into a bytes string. + """Convert a Python object into a JSON bytes string. Args: obj: The data to be converted *args: Extra arguments to pass to the dumps() function @@ -59,7 +67,7 @@ def dump_bytes(obj, *args, **kwargs) -> bytes: Special handling for 'sort_keys' which is translated to orjson.OPT_SORT_KEYS when using orjson. Returns: - The json string representation of obj as bytes + bytes: A JSON bytes string representation of obj """ if orjson is None: return json.dumps(obj, *args, **kwargs).encode("utf-8") From 27dd09ab1169ba3e1eac2c30a8abe4f693a77389 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:23:40 +0200 Subject: [PATCH 11/13] :art: isort Signed-off-by: ff137 --- nats/aio/subscription.py | 10 ++++++++-- nats/js/client.py | 10 +++++++++- tests/test_client.py | 2 ++ 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/nats/aio/subscription.py b/nats/aio/subscription.py index c2984894..19c315b1 100644 --- a/nats/aio/subscription.py +++ b/nats/aio/subscription.py @@ -15,11 +15,17 @@ from __future__ import annotations import asyncio -from typing import TYPE_CHECKING, AsyncIterator, Awaitable, Callable, List, Optional +from typing import ( + TYPE_CHECKING, + AsyncIterator, + Awaitable, + Callable, + List, + Optional, +) from uuid import uuid4 from nats import errors - # Default Pending Limits of Subscriptions from nats.aio.msg import Msg diff --git a/nats/js/client.py b/nats/js/client.py index 51fec6eb..68aea6f1 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -18,7 +18,15 @@ import time from email.parser import BytesParser from secrets import token_hex -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + List, + Optional, +) import nats.errors import nats.js.errors diff --git a/tests/test_client.py b/tests/test_client.py index 787f2e9e..5a04ca75 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -40,6 +40,7 @@ def test_default_connect_command(self): try: import orjson + # If using orjson, expected string is without spaces expected = expected.replace(" ", "") except ImportError: @@ -59,6 +60,7 @@ def test_default_connect_command_with_name(self): try: import orjson + # If using orjson, expected string is without spaces expected = expected.replace(" ", "") except ImportError: From aa026bc139ce56bf3cb60c1083b07032d50c2c61 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 20:56:07 +0200 Subject: [PATCH 12/13] :heavy_plus_sign: Add orjson to setup.py extras_require Signed-off-by: ff137 --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 8b60030c..d348c3df 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,7 @@ "nkeys": ["nkeys"], "aiohttp": ["aiohttp"], "fast_parse": ["fast-mail-parser"], + "orjson": ["orjson"], }, packages=["nats", "nats.aio", "nats.micro", "nats.protocol", "nats.js"], package_data={"nats": ["py.typed"]}, From 193eb354773cb005113df738caa9978de156ba0e Mon Sep 17 00:00:00 2001 From: ff137 Date: Wed, 12 Mar 2025 10:54:10 +0200 Subject: [PATCH 13/13] :white_check_mark: Fix expected string when using orjson Signed-off-by: ff137 --- tests/test_client.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index 5a04ca75..39a0f4ca 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -41,11 +41,10 @@ def test_default_connect_command(self): try: import orjson - # If using orjson, expected string is without spaces - expected = expected.replace(" ", "") + # If using orjson, expected string is without spaces (except for first space after CONNECT) + expected = expected.replace(" ", "").replace("CONNECT", "CONNECT ") except ImportError: pass - self.assertEqual(expected.encode(), got) def test_default_connect_command_with_name(self): @@ -61,8 +60,8 @@ def test_default_connect_command_with_name(self): try: import orjson - # If using orjson, expected string is without spaces - expected = expected.replace(" ", "") + # If using orjson, expected string is without spaces (except for first space after CONNECT) + expected = expected.replace(" ", "").replace("CONNECT", "CONNECT ") except ImportError: pass