Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions hl7/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)
from .util import escape, generate_message_control_id, unescape

logger = logging.getLogger(__file__)
logger = logging.getLogger(__name__)

_SENTINEL = object()

Expand Down Expand Up @@ -470,6 +470,27 @@ def extract_field(
segment_num, field_num, repeat_num, component_num, subcomponent_num
)

def write_field(
self,
value,
segment,
segment_num=1,
field_num=None,
repeat_num=None,
component_num=None,
subcomponent_num=None,
):
"""
Write a value, with escaping, into a message using the tree based assignment notation.
The segment must exist.

Extract a field using a future proofed approach, based on rules in:
http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
"""
self.segments(segment)(segment_num).write_field(
value, field_num, repeat_num, component_num, subcomponent_num
)

def assign_field(
self,
value,
Expand Down Expand Up @@ -497,7 +518,7 @@ def escape(self, field, app_map=None):

To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.

Pass through the message. Replace recognised characters with their escaped
Pass through the message. Replace recognized characters with their escaped
version. Return an ascii encoded string.

Functionality:
Expand Down Expand Up @@ -563,11 +584,11 @@ def create_ack(

msh.assign_field(str(source_msh(1)), 1)
msh.assign_field(str(source_msh(2)), 2)
# Sending application is source receving application
# Sending application is source receiving application
msh.assign_field(
str(application) if application is not None else str(source_msh(5)), 3
)
# Sending facility is source receving facility
# Sending facility is source receiving facility
msh.assign_field(
str(facility) if facility is not None else str(source_msh(6)), 4
)
Expand Down Expand Up @@ -723,6 +744,17 @@ def extract_field(
else:
return "" # Assume non-present optional value

def write_field(
self,
value,
field_num=None,
repeat_num=None,
component_num=None,
subcomponent_num=None,
):
"""Write a field with escaping."""
self.assign_field(escape(self, value), field_num, repeat_num, component_num, subcomponent_num)

def assign_field(
self,
value,
Expand All @@ -739,28 +771,53 @@ def assign_field(
http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
"""

# Extend the segment as needed.
while len(self) <= field_num:
self.append(self.create_field([]))
field = self(field_num)

# Assign at the field level.
if repeat_num is None:
field[:] = [value]
return

# Field is never a string so we don't need to test that.

# Extend the field repeat as needed.
while len(field) < repeat_num:
field.append(self.create_repetition([]))
repetition = field(repeat_num)

if isinstance(repetition, str):
# If the Field was a leaf (string) convert it to a repetition
repetition = self.create_repetition([])
field(repeat_num, value=repetition)

# Assign at the repetition level.
if component_num is None:
repetition[:] = [value]
return

while len(repetition) < component_num:
repetition.append(self.create_component([]))
component = repetition(component_num)

if isinstance(component, str):
# if the repetition was a leaf (string), convert it to a component.
component = self.create_component([])
repetition(component_num, value=component)

# Assign at the component level
if subcomponent_num is None:
component[:] = [value]
return

# Assign at the subcomponent level
while len(component) < subcomponent_num:
component.append("")
component(subcomponent_num, value)


def _adjust_index(self, index):
# First element is the segment name, so we don't need to adjust to get 1-based
return index
Expand Down
88 changes: 60 additions & 28 deletions hl7/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import random
import string

logger = logging.getLogger(__file__)
logger = logging.getLogger(__name__)


def ishl7(line):
Expand Down Expand Up @@ -137,7 +137,8 @@ def escape(container, field, app_map=None):
return "".join(rv)


def unescape(container, field, app_map=None): # noqa: C901
def unescape(container, field, app_map=None, is_log_error=False): # noqa: C901

"""
See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/

Expand All @@ -162,6 +163,9 @@ def unescape(container, field, app_map=None): # noqa: C901
It cannot:

* switch code pages / ISO IR character sets

If there is an error decoding an escape set, the original text is appended to
the returned text.
"""
if not field or field.find(container.esc) == -1:
return field
Expand All @@ -184,6 +188,10 @@ def unescape(container, field, app_map=None): # noqa: C901
".ce": "\r",
}

max_escape = max(10, max(len(x) for x in DEFAULT_MAP.keys()))
if app_map:
max_escape = max(max_escape, max(len(x) for x in app_map.keys()))

rv = []
collecting = []
in_seq = False
Expand All @@ -194,11 +202,13 @@ def unescape(container, field, app_map=None): # noqa: C901
value = "".join(collecting)
collecting = []
if not value:
logger.warn(
"Error unescaping value [%s], empty sequence found at %d",
field,
offset,
)
if is_log_error:
logger.warn(
"Error unescaping empty sequence found at %d",
offset,
)
rv.append(container.esc)
rv.append(container.esc)
continue
if app_map and value in app_map:
rv.append(app_map[value])
Expand All @@ -219,41 +229,63 @@ def unescape(container, field, app_map=None): # noqa: C901
value[0] == "C"
): # Convert to new Single Byte character set : 2.10.2
# Two HEX values, first value chooses the character set (ISO-IR), second gives the value
logger.warn(
"Error inline character sets [%s] not implemented, field [%s], offset [%s]",
value,
field,
offset,
)
if is_log_error:
logger.warn(
"Error inline character sets [%s] not implemented, offset [%s]",
value,
offset,
)
rv.append(container.esc)
rv.append(value)
rv.append(container.esc)
elif value[0] == "M": # Switch to new Multi Byte character set : 2.10.2
# Three HEX values, first value chooses the character set (ISO-IR), rest give the value
logger.warn(
"Error inline character sets [%s] not implemented, field [%s], offset [%s]",
value,
field,
offset,
)
if is_log_error:
logger.warn(
"Error inline character sets [%s] not implemented, offset [%s]",
value,
offset,
)
rv.append(container.esc)
rv.append(value)
rv.append(container.esc)
elif value[0] == "X": # Hex encoded Bytes: 2.10.5
value = value[1:]
try:
for off in range(0, len(value), 2):
rv.append(chr(int(value[off : off + 2], 16)))
except Exception:
if is_log_error:
logger.exception(
"Error decoding hex value [%s], offset [%s]",
value,
offset,
)
rv.append(container.esc)
rv.append("X")
rv.append(value)
rv.append(container.esc)
else:
if is_log_error:
logger.exception(
"Error decoding hex value [%s], field [%s], offset [%s]",
"Error decoding value [%s], offset [%s]",
value,
field,
offset,
)
else:
logger.exception(
"Error decoding value [%s], field [%s], offset [%s]",
value,
field,
offset,
)
rv.append(container.esc)
rv.append(value)
rv.append(container.esc)
else:
collecting.append(c)
if (len(collecting) > max_escape and collecting[0] not in "XZ") \
or (len(collecting) > 10 and collecting[0] in "XZ"):
# We have collected beyond the maximum number of characters in an escape sequence
# Assume the message is badly formed and append the initial escape plus collected
# characters to the output
rv.append(container.esc)
rv.extend(collecting)
collecting = []
in_seq = False
elif c == container.esc:
in_seq = True
else:
Expand Down
Loading