Skip to content

Commit b1eb0b6

Browse files
committed
fix: resolve Command(resume) warning about invalid packet type dict (#83)
Issue #83: Command with resume argument failing in v0.1.0 Users reported warning 'Ignoring invalid packet type <class 'dict'> in pending sends' when using Command(resume={'interrupt_id': {'some': 'result'}}) after upgrading from 0.0.8 to 0.1.0. Root cause: Type annotation mismatch in _load_pending_sends methods. They were annotated as returning List[Tuple[str, bytes]] but Redis JSON actually returns strings for blob data, not bytes, causing List[Tuple[str, Union[str, bytes]]]. This type mismatch caused the warning when Command(resume) tried to process pending sends containing dict values through the TASKS channel. Changes: - Updated return type hints for _load_pending_sends methods in both sync and async - Updated _load_pending_sends_with_registry_check type hints - Updated _abatch_load_pending_sends and local variable annotations in async - Added test that simulates Command(resume) scenario and verifies no warning - Added test for type compatibility with Redis JSON string blobs The fix ensures Command(resume) works without warnings while maintaining backward compatibility with code that passes bytes.
1 parent 5bdcc79 commit b1eb0b6

File tree

3 files changed

+147
-6
lines changed

3 files changed

+147
-6
lines changed

langgraph/checkpoint/redis/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,7 +1155,7 @@ def _load_pending_sends(
11551155
thread_id: str,
11561156
checkpoint_ns: str,
11571157
parent_checkpoint_id: str,
1158-
) -> List[Tuple[str, bytes]]:
1158+
) -> List[Tuple[str, Union[str, bytes]]]:
11591159
"""Load pending sends for a parent checkpoint.
11601160
11611161
Args:
@@ -1437,7 +1437,7 @@ def _load_pending_sends_with_registry_check(
14371437
thread_id: str,
14381438
checkpoint_ns: str,
14391439
parent_checkpoint_id: str,
1440-
) -> List[Tuple[str, bytes]]:
1440+
) -> List[Tuple[str, Union[str, bytes]]]:
14411441
"""Load pending sends for a parent checkpoint with pre-computed registry check."""
14421442
if not parent_checkpoint_id:
14431443
return []

langgraph/checkpoint/redis/aio.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
541541
if doc_parent_checkpoint_id:
542542
results = await asyncio.gather(*tasks)
543543
channel_values: Dict[str, Any] = results[0]
544-
pending_sends: List[Tuple[str, bytes]] = results[1]
544+
pending_sends: List[Tuple[str, Union[str, bytes]]] = results[1]
545545
pending_writes: List[PendingWrite] = results[2]
546546
else:
547547
# Only channel_values and pending_writes tasks
@@ -772,7 +772,7 @@ async def alist(
772772
parent_checkpoint_id = doc_data["parent_checkpoint_id"]
773773

774774
# Get pending_sends from batch results
775-
pending_sends: List[Tuple[str, bytes]] = []
775+
pending_sends: List[Tuple[str, Union[str, bytes]]] = []
776776
if parent_checkpoint_id:
777777
batch_key = (thread_id, checkpoint_ns, parent_checkpoint_id)
778778
pending_sends = pending_sends_map.get(batch_key, [])
@@ -1443,7 +1443,7 @@ async def aget_channel_values(
14431443

14441444
async def _aload_pending_sends(
14451445
self, thread_id: str, checkpoint_ns: str = "", parent_checkpoint_id: str = ""
1446-
) -> List[Tuple[str, bytes]]:
1446+
) -> List[Tuple[str, Union[str, bytes]]]:
14471447
"""Load pending sends for a parent checkpoint.
14481448
14491449
Args:
@@ -1640,7 +1640,7 @@ async def _aload_pending_writes(
16401640

16411641
async def _abatch_load_pending_sends(
16421642
self, batch_keys: List[Tuple[str, str, str]]
1643-
) -> Dict[Tuple[str, str, str], List[Tuple[str, bytes]]]:
1643+
) -> Dict[Tuple[str, str, str], List[Tuple[str, Union[str, bytes]]]]:
16441644
"""Batch load pending sends for multiple parent checkpoints.
16451645
16461646
Args:

tests/test_checkpoint_serialization.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,147 @@ def _saver(redis_url: str):
3535
del saver
3636

3737

38+
def test_issue_83_command_resume_no_warning(redis_url: str) -> None:
39+
"""Test that Command(resume={...}) doesn't cause 'invalid packet type' warning (issue #83).
40+
41+
The user reported that Command(resume={'interrupt_id': {'some': 'result'}})
42+
caused warning: "Ignoring invalid packet type <class 'dict'> in pending sends"
43+
This test verifies our fix prevents that warning.
44+
"""
45+
import warnings
46+
47+
from langgraph.constants import TASKS
48+
49+
with _saver(redis_url) as saver:
50+
# Create interrupted checkpoint
51+
interrupted_config = {
52+
"configurable": {
53+
"thread_id": "test-thread-83",
54+
"checkpoint_ns": "",
55+
"checkpoint_id": "interrupted-checkpoint",
56+
}
57+
}
58+
59+
interrupted_checkpoint = {
60+
"v": 1,
61+
"ts": "2024-01-01T00:00:00+00:00",
62+
"id": "interrupted-checkpoint",
63+
"channel_values": {"messages": ["before interrupt"]},
64+
"channel_versions": {},
65+
"versions_seen": {},
66+
"pending_sends": [],
67+
}
68+
69+
metadata = {"source": "loop", "step": 1, "writes": {}}
70+
71+
# Save the interrupted checkpoint
72+
saver.put(interrupted_config, interrupted_checkpoint, metadata, {})
73+
74+
# Simulate Command(resume={'interrupt_id': {'some': 'result'}})
75+
resume_data = {"interrupt_id": {"some": "result"}}
76+
saver.put_writes(
77+
interrupted_config,
78+
[(TASKS, resume_data)], # This puts a dict into TASKS
79+
task_id="resume_task",
80+
)
81+
82+
# Create resumed checkpoint with parent reference
83+
resumed_config = {
84+
"configurable": {
85+
"thread_id": "test-thread-83",
86+
"checkpoint_ns": "",
87+
"checkpoint_id": "resumed-checkpoint",
88+
}
89+
}
90+
91+
resumed_checkpoint = {
92+
"v": 1,
93+
"ts": "2024-01-01T00:01:00+00:00",
94+
"id": "resumed-checkpoint",
95+
"channel_values": {"messages": ["after resume"]},
96+
"channel_versions": {},
97+
"versions_seen": {},
98+
"pending_sends": [],
99+
}
100+
101+
resumed_metadata = {
102+
"source": "loop",
103+
"step": 2,
104+
"writes": {},
105+
"parent_config": interrupted_config,
106+
}
107+
108+
saver.put(resumed_config, resumed_checkpoint, resumed_metadata, {})
109+
110+
# Load resumed checkpoint - check for warning
111+
with warnings.catch_warnings(record=True) as w:
112+
warnings.simplefilter("always")
113+
114+
result = saver.get_tuple(resumed_config)
115+
116+
# Check if we get the warning about invalid packet type
117+
dict_warnings = [
118+
warning
119+
for warning in w
120+
if "Ignoring invalid packet type" in str(warning.message)
121+
and "dict" in str(warning.message)
122+
]
123+
124+
# Our fix should prevent this warning
125+
assert len(dict_warnings) == 0, f"Got warning: {dict_warnings}"
126+
127+
assert result is not None
128+
assert result.checkpoint["id"] == "resumed-checkpoint"
129+
130+
131+
def test_issue_83_pending_sends_type_compatibility(redis_url: str) -> None:
132+
"""Test that pending_sends work with string blobs from Redis JSON (issue #83).
133+
134+
Issue #83 was caused by type mismatch where _load_pending_sends returned
135+
List[Tuple[str, Union[str, bytes]]] but was annotated as List[Tuple[str, bytes]].
136+
This test verifies the fix works correctly.
137+
"""
138+
with _saver(redis_url) as saver:
139+
checkpoint_dict = {
140+
"v": 1,
141+
"ts": "2024-01-01T00:00:00+00:00",
142+
"id": "test-checkpoint",
143+
"channel_versions": {},
144+
"versions_seen": {},
145+
}
146+
147+
channel_values = {}
148+
149+
# Test with string blobs (what Redis JSON returns)
150+
pending_sends_with_strings = [
151+
("json", '{"test": "value"}'), # String blob from Redis JSON
152+
]
153+
154+
# This should work without type errors
155+
result = saver._load_checkpoint(
156+
checkpoint_dict, channel_values, pending_sends_with_strings
157+
)
158+
159+
assert "pending_sends" in result
160+
assert len(result["pending_sends"]) == 1
161+
assert result["pending_sends"][0] == {"test": "value"}
162+
163+
# Test JsonPlusRedisSerializer compatibility
164+
test_data = {"some": "result", "user_input": "continue"}
165+
166+
# Serialize
167+
type_str, blob = saver.serde.dumps_typed(test_data)
168+
assert isinstance(type_str, str)
169+
assert isinstance(blob, str) # JsonPlusRedisSerializer returns strings
170+
171+
# Deserialize - should work with both string and bytes
172+
result1 = saver.serde.loads_typed((type_str, blob))
173+
result2 = saver.serde.loads_typed((type_str, blob.encode())) # bytes version
174+
175+
assert result1 == test_data
176+
assert result2 == test_data
177+
178+
38179
def test_load_blobs_method(redis_url: str) -> None:
39180
"""Test _load_blobs method with various scenarios.
40181

0 commit comments

Comments
 (0)