Skip to content

Commit 9bce822

Browse files
[ENH] Using msgpack instead of json
Using msgpack instead of json results in faster (de)serialize and less memory usage. Redis is also capable of msgpack within its lua api i.e. https://github.com/kengonakajima/lua-msgpack-native. ====== Benchmark ======= JSON median size: 387 MSGPACK median size: 329 ------------------------ Diff: 16.20% JSON * Serialize: 39286 * Deserialize: 30713 MSGPACK * Serialize: 23483 * Deserialize: 12602 --------------------- DIFF * Serialize: 50.35% * Deserialize: 83.62% Data extracted from spamhaus-collector Measurements based on deduplicator-expert 460 events in total process by deducplicator-expert Signed-off-by: Sebastian Waldbauer <[email protected]>
1 parent 272ae20 commit 9bce822

File tree

15 files changed

+105
-56
lines changed

15 files changed

+105
-56
lines changed

debian/control

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Build-Depends: debhelper (>= 4.1.16),
2020
python3-sphinx-rtd-theme,
2121
python3-termstyle,
2222
python3-tz,
23+
python3-msgpack,
2324
quilt,
2425
rsync,
2526
safe-rm
@@ -40,6 +41,7 @@ Depends: bash-completion,
4041
python3-requests (>= 2.2.1),
4142
python3-termstyle (>= 0.1.10),
4243
python3-tz,
44+
python3-msgpack,
4345
redis-server,
4446
systemd,
4547
${misc:Depends},

intelmq/bots/parsers/json/parser.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ def process(self):
2525
for line in lines:
2626
new_event = MessageFactory.unserialize(line,
2727
harmonization=self.harmonization,
28-
default_type='Event')
28+
default_type='Event',
29+
use_packer="json")
30+
2931
event = self.new_event(report)
3032
event.update(new_event)
3133
if 'raw' not in event:

intelmq/lib/bot.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import inspect
1414
import io
1515
import json
16+
import msgpack
1617
import logging
1718
import os
1819
import re
@@ -318,8 +319,8 @@ def start(self, starting: bool = True, error_on_pipeline: bool = True,
318319
self.logger.error('Pipeline failed.')
319320
self.__disconnect_pipelines()
320321

321-
except exceptions.DecodingError as exc:
322-
self.logger.exception('Could not decode message from pipeline. No retries useful.')
322+
except exceptions.UnserializationError as exc:
323+
self.logger.exception('Could not unserialize message from pipeline. No retries useful.')
323324

324325
# ensure that we do not re-process the faulty message
325326
self.__error_retries_counter = self.error_max_retries + 1

intelmq/lib/exceptions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,12 @@ def __init__(self, encodings=None, exception: UnicodeDecodeError = None,
168168
suffixes.append('with reason %r' % exception.reason)
169169
suffix = (' ' + ' '.join(suffixes)) if suffixes else ''
170170
super().__init__("Could not decode string%s." % suffix)
171+
172+
173+
class UnserializationError(IntelMQException, ValueError):
174+
"""
175+
Unrecoverable error during message unserialization
176+
"""
177+
def __init__(self, exception: Exception = None, object: bytes = None):
178+
self.object = object
179+
super().__init__("Could not unserialize message%s." % exception)

intelmq/lib/message.py

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import warnings
1111
from collections import defaultdict
1212
from typing import Any, Dict, Iterable, Optional, Sequence, Union
13+
import msgpack
1314

1415
import intelmq.lib.exceptions as exceptions
1516
import intelmq.lib.harmonization
@@ -54,8 +55,8 @@ def from_dict(message: dict, harmonization=None,
5455
return class_reference(message, auto=True, harmonization=harmonization)
5556

5657
@staticmethod
57-
def unserialize(raw_message: str, harmonization: dict = None,
58-
default_type: Optional[str] = None) -> dict:
58+
def unserialize(raw_message: bytes, harmonization: dict = None,
59+
default_type: Optional[str] = None, use_packer: str = "msgpack") -> dict:
5960
"""
6061
Takes JSON-encoded Message object, returns instance of correct class.
6162
@@ -68,12 +69,12 @@ def unserialize(raw_message: str, harmonization: dict = None,
6869
MessageFactory.from_dict
6970
MessageFactory.serialize
7071
"""
71-
message = Message.unserialize(raw_message)
72+
message = Message.unserialize(raw_message, use_packer=use_packer)
7273
return MessageFactory.from_dict(message, harmonization=harmonization,
7374
default_type=default_type)
7475

7576
@staticmethod
76-
def serialize(message):
77+
def serialize(message) -> bytes:
7778
"""
7879
Takes instance of message-derived class and makes JSON-encoded Message.
7980
@@ -121,7 +122,7 @@ def __init__(self, message: Union[dict, tuple] = (), auto: bool = False,
121122
elif isinstance(message, tuple):
122123
self.iterable = dict(message)
123124
else:
124-
raise ValueError("Type %r of message can't be handled, must be dict or tuple.", type(message))
125+
raise ValueError("Type %r of message can't be handled, must be dict or tuple." % type(message))
125126
for key, value in self.iterable.items():
126127
if not self.add(key, value, sanitize=False, raise_failure=False):
127128
self.add(key, value, sanitize=True)
@@ -304,18 +305,32 @@ def deep_copy(self):
304305
harmonization={self.__class__.__name__.lower(): self.harmonization_config})
305306

306307
def __str__(self):
307-
return self.serialize()
308+
return self.serialize(use_packer="json")
308309

309-
def serialize(self):
310-
self['__type'] = self.__class__.__name__
311-
json_dump = utils.decode(json.dumps(self))
312-
del self['__type']
313-
return json_dump
310+
def serialize(self, use_packer: str = "msgpack"):
311+
delete_type = False
312+
if '__type' not in self:
313+
delete_type = True
314+
self['__type'] = self.__class__.__name__
315+
316+
if use_packer == "json":
317+
packed = json.dumps(self)
318+
else:
319+
packed = msgpack.packb(self)
320+
321+
if delete_type:
322+
del self['__type']
323+
return packed
314324

315325
@staticmethod
316-
def unserialize(message_string: str):
317-
message = json.loads(message_string)
318-
return message
326+
def unserialize(message: bytes, use_packer: str = "msgpack"):
327+
try:
328+
if use_packer == "json":
329+
return json.loads(message)
330+
else:
331+
return msgpack.unpackb(message)
332+
except Exception as exc:
333+
raise exceptions.UnserializationError(exception=exc, object=message)
319334

320335
def __is_valid_key(self, key: str):
321336
try:
@@ -462,14 +477,18 @@ def to_dict(self, hierarchical: bool = False, with_type: bool = False,
462477
json_dict_fp = json_dict_fp[subkey]
463478

464479
for key, value in jsondicts.items():
465-
new_dict[key] = json.dumps(value, ensure_ascii=False)
480+
new_dict[key] = json.dumps(value)
466481

467482
return new_dict
468483

469484
def to_json(self, hierarchical=False, with_type=False, jsondict_as_string=False):
470485
json_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
471486
return json.dumps(json_dict, ensure_ascii=False, sort_keys=True)
472487

488+
def to_msgpack(self, hierarchical=False, with_type=False):
489+
msgpack_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
490+
return msgpack.packb(msgpack_dict)
491+
473492
def __eq__(self, other: dict) -> bool:
474493
"""
475494
Wrapper is necessary as we have additional members

intelmq/lib/pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,14 @@ def send(self, message: str, path: str = "_default",
117117
path_permissive: bool = False):
118118
raise NotImplementedError
119119

120-
def receive(self) -> str:
120+
def receive(self) -> bytes:
121121
if self._has_message:
122122
raise exceptions.PipelineError("There's already a message, first "
123123
"acknowledge the existing one.")
124124

125125
retval = self._receive()
126126
self._has_message = True
127-
return utils.decode(retval)
127+
return retval
128128

129129
def _receive(self) -> bytes:
130130
raise NotImplementedError

intelmq/lib/test.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io
99
import inspect
1010
import json
11+
import msgpack
1112
import os
1213
import re
1314
import unittest
@@ -150,8 +151,7 @@ def setUpClass(cls):
150151
elif cls.bot_type != 'collector' and cls.default_input_message == '':
151152
cls.default_input_message = {'__type': 'Event'}
152153
if type(cls.default_input_message) is dict:
153-
cls.default_input_message = \
154-
utils.decode(json.dumps(cls.default_input_message))
154+
cls.default_input_message = msgpack.packb(cls.default_input_message)
155155

156156
if cls.use_cache and not os.environ.get('INTELMQ_SKIP_REDIS'):
157157
password = os.environ.get('INTELMQ_TEST_REDIS_PASSWORD') or \
@@ -168,10 +168,10 @@ def setUpClass(cls):
168168
harmonization = utils.load_configuration(pkg_resources.resource_filename('intelmq',
169169
'etc/harmonization.conf'))
170170

171-
def new_report(self, auto=False, examples=False):
171+
def new_report(self, auto=False, examples=False) -> message.Report:
172172
return message.Report(harmonization=self.harmonization, auto=auto)
173173

174-
def new_event(self):
174+
def new_event(self) -> message.Event:
175175
return message.Event(harmonization=self.harmonization)
176176

177177
def get_mocked_logger(self, logger):
@@ -240,7 +240,7 @@ def prepare_bot(self, parameters={}, destination_queues=None):
240240
self.input_queue = []
241241
for msg in self.input_message:
242242
if type(msg) is dict:
243-
self.input_queue.append(json.dumps(msg))
243+
self.input_queue.append(message.MessageFactory.serialize(msg))
244244
elif issubclass(type(msg), message.Message):
245245
self.input_queue.append(msg.serialize())
246246
else:
@@ -315,17 +315,17 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False,
315315

316316
""" Test if report has required fields. """
317317
if self.bot_type == 'collector':
318-
for report_json in self.get_output_queue():
319-
report = message.MessageFactory.unserialize(report_json,
318+
for report_data in self.get_output_queue():
319+
report = message.MessageFactory.unserialize(report_data,
320320
harmonization=self.harmonization)
321321
self.assertIsInstance(report, message.Report)
322322
self.assertIn('raw', report)
323323
self.assertIn('time.observation', report)
324324

325325
""" Test if event has required fields. """
326326
if self.bot_type == 'parser':
327-
for event_json in self.get_output_queue():
328-
event = message.MessageFactory.unserialize(event_json,
327+
for event_data in self.get_output_queue():
328+
event = message.MessageFactory.unserialize(event_data,
329329
harmonization=self.harmonization)
330330
self.assertIsInstance(event, message.Event)
331331
self.assertIn('classification.type', event)
@@ -383,7 +383,7 @@ def get_output_queue(self, path="_default"):
383383
"""Getter for items in the output queues of this bot. Use in TestCase scenarios
384384
If there is multiple queues in named queue group, we return all the items chained.
385385
"""
386-
return [utils.decode(text) for text in chain(*[self.pipe.state[x] for x in self.pipe.destination_queues[path]])]
386+
return [text for text in chain(*[self.pipe.state[x] for x in self.pipe.destination_queues[path]])]
387387
# return [utils.decode(text) for text in self.pipe.state["%s-output" % self.bot_id]]
388388

389389
def test_bot_name(self, *args, **kwargs):
@@ -514,9 +514,9 @@ def assertMessageEqual(self, queue_pos, expected_msg, compare_raw=True, path="_d
514514
given queue position.
515515
"""
516516
event = self.get_output_queue(path=path)[queue_pos]
517-
self.assertIsInstance(event, str)
517+
self.assertIsInstance(event, bytes)
518518

519-
event_dict = json.loads(event)
519+
event_dict = msgpack.unpackb(event)
520520
if isinstance(expected_msg, (message.Event, message.Report)):
521521
expected = expected_msg.to_dict(with_type=True)
522522
else:

intelmq/tests/bots/collectors/tcp/test_collector.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,10 @@ def test_intelmq_exchange(self):
119119
for i, msg in enumerate(self.get_output_queue()):
120120
report = MessageFactory.unserialize(msg, harmonization=self.harmonization, default_type='Event')
121121

122-
output = MessageFactory.unserialize(utils.base64_decode(report["raw"]), harmonization=self.harmonization, default_type='Event')
122+
output = MessageFactory.unserialize(utils.base64_decode(report["raw"]),
123+
harmonization=self.harmonization,
124+
default_type='Event',
125+
use_packer="json")
123126
self.assertDictEqual(output, INPUT1)
124127

125128
del report['time.observation']

intelmq/tests/bots/experts/cymru_whois/test_expert.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# -*- coding: utf-8 -*-
2-
import json
2+
import msgpack
33
import unittest
44

55
import intelmq.lib.test as test
@@ -89,7 +89,7 @@ def test_6to4_result(self):
8989
"""
9090
self.input_message = EXAMPLE_6TO4_INPUT
9191
self.run_bot()
92-
actual = json.loads(self.get_output_queue()[0])
92+
actual = msgpack.loads(self.get_output_queue()[0])
9393
self.assertDictContainsSubset(EXAMPLE_6TO4_INPUT, actual)
9494
self.assertIn("source.asn", actual)
9595
self.assertIn("source.as_name", actual)

intelmq/tests/bots/experts/idea/test_expert.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# -*- coding: utf-8 -*-
22
import unittest
33
import json
4+
import msgpack
45

56
import intelmq.lib.test as test
67
from intelmq.bots.experts.idea.expert import IdeaExpertBot
@@ -82,8 +83,8 @@ def test_conversion(self):
8283
# The ID in the generated Idea event is random, so we have to extract
8384
# the data from the "output" field and compare after removing ID's
8485
event = self.get_output_queue()[0]
85-
self.assertIsInstance(event, str)
86-
event_dict = json.loads(event)
86+
self.assertIsInstance(event, bytes)
87+
event_dict = msgpack.loads(event)
8788
self.assertIsInstance(event_dict, dict)
8889
self.assertTrue("output" in event_dict)
8990
idea_event = json.loads(event_dict["output"])

0 commit comments

Comments
 (0)