Skip to content

Conversation

@ultmaster
Copy link
Contributor

@ultmaster ultmaster commented Nov 29, 2025

This PR should solve at least a few problems:

  1. Bad request for many-rollouts as query param.
  2. Leveled locks: read w/wo snapshot; optional commit; group by labels.
  3. Some methods have already been divided into several transaction stages. Sacrifice accuracy for performance.
  4. dequeue_rollouts should not have more failures for mongo implementation.

This PR does NOT solve:

  1. enqueue_many_rollouts (only interface is provided right now)
  2. thread leveled lock is not natively enabled for in-memory store off-the-shelf. More testing is needed.
  3. I think update_worker once more after start/dequeue_rollout seems unnecessary. Will change that behavior in the next PR.
  4. The results are not benchmarked right now. There is no point of doing it so far.

Copilot AI review requested due to automatic review settings November 29, 2025 14:39
Copilot finished reviewing on behalf of ultmaster November 29, 2025 14:41
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds POST-based /search endpoints for querying rollouts, attempts, spans, and workers, providing an alternative to the existing GET endpoints. The client implementation is updated to use the new POST endpoints, and comprehensive tests ensure GET/POST parity. Additionally, the PR includes tests validating that update endpoints properly distinguish between unset fields and null values.

  • Adds POST /search endpoints for rollouts, attempts, spans, and workers
  • Updates client methods to use POST requests with JSON payloads instead of GET with query parameters
  • Adds tests to verify GET/POST endpoint parity and update semantics for unset vs null fields

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
tests/store/test_restful.py Adds helper functions for testing GET/POST parity and new test cases for rollouts, attempts, spans, and workers search endpoints; includes update semantics tests
agentlightning/store/client_server.py Implements new POST /search endpoints on the server side, updates client methods to use POST requests with JSON payloads, and adjusts path template handling for metrics

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

params.append(("sort_order", sort_order))
payload["sort_by"] = sort_by
payload["sort_order"] = sort_order

Copy link

Copilot AI Nov 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing limit and offset parameters in the payload. These parameters are included in other query methods (query_rollouts, query_attempts, query_spans) but are missing here. This will cause pagination to not work correctly for worker queries.

Add the following after line 1967:

payload["limit"] = limit
payload["offset"] = offset
Suggested change
payload["limit"] = limit
payload["offset"] = offset

Copilot uses AI. Check for mistakes.
@ultmaster
Copy link
Contributor Author

/ci

@github-actions
Copy link

github-actions bot commented Nov 29, 2025

🚀 CI Watcher for correlation id-3591750141-mikg0zy3 triggered by comment 3591750141
🏃‍♀️ Tracking 4 workflow run(s):

✅ All runs completed.

@ultmaster
Copy link
Contributor Author

/ci

@ultmaster ultmaster requested a review from Copilot December 2, 2025 16:31
@github-actions
Copy link

github-actions bot commented Dec 2, 2025

🚀 CI Watcher for correlation id-3602936928-miospqid triggered by comment 3602936928
🏃‍♀️ Tracking 4 workflow run(s):

✅ All runs completed.

Copilot finished reviewing on behalf of ultmaster December 2, 2025 16:35
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 17 out of 17 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

async def _get_latest_resources_id(self, collections: T_collections) -> Optional[str]:
@tracked("_get_latest_resources")
async def _get_latest_resources(self) -> Optional[ResourcesUpdate]:
"""Get the latest resources ID from the collections. Returns `None` if no resources are found."""
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring says "Get the latest resources ID from the collections" but the method actually returns Optional[ResourcesUpdate], not just an ID. The docstring should be updated to match the implementation:

"""Get the latest resources from the collections. Returns `None` if no resources are found."""
Suggested change
"""Get the latest resources ID from the collections. Returns `None` if no resources are found."""
"""Get the latest resources from the collections. Returns `None` if no resources are found."""

Copilot uses AI. Check for mistakes.
Comment on lines +244 to +258
async def enqueue_many_rollouts(self, inputs: Sequence[EnqueueRolloutRequest]) -> Sequence[Rollout]:
"""Persist multiple rollouts in `queuing` state.
The implementation can delegate to [`enqueue_rollout()`][agentlightning.LightningStore.enqueue_rollout]
per request and preserves the input ordering. Subclasses can override to provide
more efficient bulk enqueue semantics.
Args:
inputs: Rollout submission payloads mirroring [`enqueue_rollout()`][agentlightning.LightningStore.enqueue_rollout]'s
parameters. Each entry requires `input` and can optionally include other fields.
Returns:
Rollouts enqueued in the same order as `inputs`.
"""
raise NotImplementedError()
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method enqueue_many_rollouts is declared in the base class LightningStore (line 244) but appears not to be implemented in CollectionBasedLightningStore. This will cause NotImplementedError at runtime when the method is called. Either implement this method or document that it's intentionally unimplemented in this PR.

Copilot uses AI. Check for mistakes.
Comment on lines +284 to +305
async def dequeue_many_rollouts(
self,
*,
limit: int = 1,
worker_id: Optional[str] = None,
) -> Sequence[AttemptedRollout]:
"""Claim up to `limit` queued rollouts without blocking.
The implementation can repeatedly invokes
[`dequeue_rollout()`][agentlightning.LightningStore.dequeue_rollout] until reaching
the requested limit or the queue is empty. Subclasses can override it to fetch
multiple rollouts atomically.
Args:
limit: Maximum number of rollouts to claim. Non-positive values return an empty list.
worker_id: Optional worker identifier passed through to each dequeue call.
Returns:
Attempted rollouts claimed in FIFO order. May contain fewer than `limit` entries
when the queue is exhausted.
"""
raise NotImplementedError()
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method dequeue_many_rollouts is declared in the base class LightningStore (line 284) but appears not to be implemented in CollectionBasedLightningStore. This will cause NotImplementedError at runtime when the method is called. Either implement this method or document that it's intentionally unimplemented in this PR.

Copilot uses AI. Check for mistakes.
Comment on lines 1605 to 1617
payload: Dict[str, Any] = {}
if resolved_status is not None:
_extend("status_in", resolved_status)
payload["status_in"] = resolved_status
if resolved_rollout_ids is not None:
_extend("rollout_id_in", resolved_rollout_ids)
payload["rollout_id_in"] = resolved_rollout_ids
if rollout_id_contains is not None:
params_list.append(("rollout_id_contains", rollout_id_contains))
params_list.append(("filter_logic", filter_logic))
payload["rollout_id_contains"] = rollout_id_contains
payload["filter_logic"] = filter_logic
if sort_by is not None:
params_list.append(("sort_by", sort_by))
params_list.append(("sort_order", sort_order))
params_list.append(("limit", limit))
params_list.append(("offset", offset))
payload["sort_by"] = sort_by
payload["sort_order"] = sort_order
payload["limit"] = limit
payload["offset"] = offset
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The payload initialization is inconsistent. For rollouts, spans, and workers, an empty dict is created and conditionally populated. However, for attempts, the dict is initialized with required fields. Consider initializing with required fields first for consistency:

payload: Dict[str, Any] = {
    "limit": limit,
    "offset": offset,
}

This matches the pattern used in the attempts query below (lines 1639-1642).

Copilot uses AI. Check for mistakes.
Comment on lines +1959 to +1964
payload: Dict[str, Any] = {}
if status_in is not None:
for value in status_in:
params.append(("status_in", value))
payload["status_in"] = status_in
if worker_id_contains is not None:
params.append(("worker_id_contains", worker_id_contains))
params.append(("filter_logic", filter_logic))
payload["worker_id_contains"] = worker_id_contains
payload["filter_logic"] = filter_logic
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The payload dictionary for workers query is missing the required limit and offset fields that are included in other query methods. This could cause pagination to fail or behave unexpectedly. Add:

payload: Dict[str, Any] = {
    "limit": limit,
    "offset": offset,
}

Copilot uses AI. Check for mistakes.
To developers, please check whether the implementation is correct by checking the following:
1. Whether all `_unlocked_*` methods are guarded by some `atomic()` or `execute()` context.
2. Whether all `atomic()` or `execute()` contexts are labeled (label="...") correctly.
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring states label="..." but the actual parameter name is labels (plural). This should be corrected to labels=... for accuracy:

2. Whether all `atomic()` or `execute()` contexts are labeled (labels=...) correctly.
Suggested change
2. Whether all `atomic()` or `execute()` contexts are labeled (label="...") correctly.
2. Whether all `atomic()` or `execute()` contexts are labeled (labels="...") correctly.

Copilot uses AI. Check for mistakes.
labels = list(self._lock.keys())
managers = [self._lock[label] for label in labels]
async with AsyncExitStack() as stack:
_locks = [await stack.enter_async_context(manager) for manager in managers]
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable _locks is not used.

Suggested change
_locks = [await stack.enter_async_context(manager) for manager in managers]
for manager in managers:
await stack.enter_async_context(manager)

Copilot uses AI. Check for mistakes.
@ultmaster ultmaster merged commit 003b8c6 into main Dec 2, 2025
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants