Skip to content

fix: enhance message deserialization to prevent MESSAGE_COERCION_FAIL… #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 66 additions & 10 deletions langgraph/checkpoint/redis/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
import binascii
import logging
import random
from abc import abstractmethod
from typing import Any, Dict, Generic, List, Optional, Sequence, Tuple, Union, cast
Expand Down Expand Up @@ -27,6 +28,8 @@
from .jsonplus_redis import JsonPlusRedisSerializer
from .types import IndexType, RedisClientType

logger = logging.getLogger(__name__)

REDIS_KEY_SEPARATOR = ":"
CHECKPOINT_PREFIX = "checkpoint"
CHECKPOINT_BLOB_PREFIX = "checkpoint_blob"
Expand Down Expand Up @@ -305,25 +308,78 @@ def _deserialize_channel_values(

When channel values are stored inline in the checkpoint, they're in their
serialized form. This method deserializes them back to their original types.

This specifically handles LangChain message objects that may be stored in their
serialized format: {'lc': 1, 'type': 'constructor', 'id': [...], 'kwargs': {...}}
and ensures they are properly reconstructed as message objects.
"""
if not channel_values:
return {}

# Apply recursive deserialization to handle nested structures and LangChain objects
return self._recursive_deserialize(channel_values)
try:
# Apply recursive deserialization to handle nested structures and LangChain objects
return self._recursive_deserialize(channel_values)
except Exception as e:
logger.warning(
f"Error deserializing channel values, attempting recovery: {e}"
)
# Attempt to recover by processing each channel individually
recovered = {}
for key, value in channel_values.items():
try:
recovered[key] = self._recursive_deserialize(value)
except Exception as inner_e:
logger.error(
f"Failed to deserialize channel '{key}': {inner_e}. "
f"Value will be returned as-is."
)
recovered[key] = value
return recovered

def _recursive_deserialize(self, obj: Any) -> Any:
"""Recursively deserialize LangChain objects and nested structures."""
"""Recursively deserialize LangChain objects and nested structures.

This method specifically handles the deserialization of LangChain message objects
that may be stored in their serialized format to prevent MESSAGE_COERCION_FAILURE.

Args:
obj: The object to deserialize, which may be a dict, list, or primitive.

Returns:
The deserialized object, with LangChain objects properly reconstructed.
"""
if isinstance(obj, dict):
# Check if this is a LangChain serialized object
if obj.get("lc") in (1, 2) and obj.get("type") == "constructor":
# Use the serde's reviver to reconstruct the object
if hasattr(self.serde, "_reviver"):
return self.serde._reviver(obj)
elif hasattr(self.serde, "_revive_if_needed"):
return self.serde._revive_if_needed(obj)
else:
# Fallback: return as-is if serde doesn't have reviver
try:
# Use the serde's reviver to reconstruct the object
if hasattr(self.serde, "_reviver"):
return self.serde._reviver(obj)
elif hasattr(self.serde, "_revive_if_needed"):
return self.serde._revive_if_needed(obj)
else:
# Log warning if serde doesn't have reviver
logger.warning(
"Serializer does not have a reviver method. "
"LangChain object may not be properly deserialized. "
f"Object ID: {obj.get('id')}"
)
return obj
except Exception as e:
# Provide detailed error message for debugging
obj_id = obj.get("id", "unknown")
obj_type = (
obj.get("id", ["unknown"])[-1]
if isinstance(obj.get("id"), list)
else "unknown"
)
logger.error(
f"Failed to deserialize LangChain object of type '{obj_type}'. "
f"This may cause MESSAGE_COERCION_FAILURE. Error: {e}. "
f"Object structure: lc={obj.get('lc')}, type={obj.get('type')}, "
f"id={obj_id}"
)
# Return the object as-is to prevent complete failure
return obj
# Recursively process nested dicts
return {k: self._recursive_deserialize(v) for k, v in obj.items()}
Expand Down
41 changes: 38 additions & 3 deletions langgraph/checkpoint/redis/jsonplus_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,27 @@


class JsonPlusRedisSerializer(JsonPlusSerializer):
"""Redis-optimized serializer using orjson for faster JSON processing."""
"""Redis-optimized serializer using orjson for faster JSON processing.

This serializer handles the conversion of LangChain objects (including messages)
to and from their serialized format. It specifically addresses the MESSAGE_COERCION_FAILURE
issue by ensuring that LangChain message objects stored in their serialized format
(with 'lc', 'type', 'constructor' fields) are properly reconstructed as message objects
rather than being left as raw dictionaries.

The serialized format for LangChain objects looks like:
{
'lc': 1, # LangChain version marker
'type': 'constructor',
'id': ['langchain', 'schema', 'messages', 'HumanMessage'],
'kwargs': {'content': '...', 'type': 'human', 'id': '...'}
}

This serializer ensures such objects are properly deserialized back to their
original message object form (e.g., HumanMessage, AIMessage) to prevent
downstream errors when the application expects message objects with specific
attributes and methods.
"""

SENTINEL_FIELDS = [
"thread_id",
Expand Down Expand Up @@ -39,16 +59,31 @@ def loads(self, data: bytes) -> Any:
return super().loads(data)

def _revive_if_needed(self, obj: Any) -> Any:
"""Recursively apply reviver to handle LangChain serialized objects."""
"""Recursively apply reviver to handle LangChain serialized objects.

This method is crucial for preventing MESSAGE_COERCION_FAILURE by ensuring
that LangChain message objects stored in their serialized format are properly
reconstructed. Without this, messages would remain as dictionaries with
'lc', 'type', and 'constructor' fields, causing errors when the application
expects actual message objects with 'role' and 'content' attributes.

Args:
obj: The object to potentially revive, which may be a dict, list, or primitive.

Returns:
The revived object with LangChain objects properly reconstructed.
"""
if isinstance(obj, dict):
# Check if this is a LangChain serialized object
if obj.get("lc") in (1, 2) and obj.get("type") == "constructor":
# Use parent's reviver method to reconstruct the object
# This converts {'lc': 1, 'type': 'constructor', ...} back to
# the actual LangChain object (e.g., HumanMessage, AIMessage)
return self._reviver(obj)
# Recursively process nested dicts
return {k: self._revive_if_needed(v) for k, v in obj.items()}
elif isinstance(obj, list):
# Recursively process lists
# Recursively process lists (e.g., lists of messages)
return [self._revive_if_needed(item) for item in obj]
else:
# Return primitives as-is
Expand Down
Loading