Skip to content

Conversation

@mydmdm
Copy link

@mydmdm mydmdm commented Nov 5, 2025

This pull request implements a SqlLightningStore which uses a SQL database for persistent storage and online querying of rollouts, attempts, and spans.

Key Changes

The main addition is in the following folder and file:

agentlightning/store/database
  └── sqlite.py       # New SqlLightningStore implementation
  └── retry_helper.py # A configurable async retry helper
  └── orm
      └── base.py     # Base ORM models
      └── rollout.py  # ORM model `RolloutInDB` for rollouts
      └── attempt.py  # ORM model `AttemptInDB` for attempts
      └── span.py     # ORM model `SpanInDB` for spans
      └── resource.py # ORM model `ResourcesUpdateInDB` for resources

Usage

sqlstore = SqlLightningStore(
  database_url="sqlite+aiosqlite:///path/to/db.sqlite3",
)
await sqlstore.start() # Initialize the database connection
... # Use the store as usual
await sqlstore.stop()  # Close the database connection

The key parameter is database_url, which specifies the database connection string. The implementation is based on SQLAlchemy, and more details about supported database URLs can be found in the SQLAlchemy documentation.

More configuration options are available in the constructor, such as the retrying behavior for wait_for_rollouts() and periodic timeout checks for attempts.

Required interface methods from LightningStore are implemented, such as start_rollout(), update_attempt(), add_span(), and querying methods.

Data Model

The data model is defined using SQLAlchemy ORM classes, which map to database tables. Key models include:

  • RolloutInDB: Represents a rollout in the database.
  • AttemptInDB: Represents an attempt in the database.
  • SpanInDB: Represents a span in the database.
  • ResourcesUpdateInDB: Represents a resources update in the database.
  • SpanSeqIdInDB: Used for generating unique span sequence IDs.

To ensure data consistency during concurrent updates, the optimistic concurrency control (OCC) mechanism with version counting and CAS (compare-and-swap) is applied to RolloutInDB (for rollout dequeuing) and SpanSeqIdInDB (for span sequence ID generation) to prevent race conditions.

Status Transition Diagram

The rollouts and attempts status transitions are illustrated in the following diagram (where cancelled is not shown), in a nested manner:

  • The outer graph shows the rollout status transitions (as preparing is merged into running), and the inner graph shows the attempt status transitions (during the running of a rollout).
  • The green edges indicate transitions caused by start_rollout() and dequeue_rollout(), which make the rollout enter the running status.
  • The finishing of latest attempt, no matter succeeded or failed, makes the rollout leave the running status, either to succeeded, failed, or requeuing (if more retries are allowed).
  • If an attempt is started for a rollout, the status of latest attempt is reflected to the status of the rollout, until the attempt is finished (either succeeds or fails, while timeout and unresponsive are mapped to failed).
---
config:
  layout: dagre
  theme: default
  look: classic
---
flowchart TB
    subgraph running["running of rollout"]
        direction LR
        st["st"]
        prep("preparing")
        r("running")
        suc("succeeded")
        f("failed")
        u("unresponsive")
        t("timeout")
        st2["st2"]
        st3["st3"]
        %% a1@{ shape: text, label: "attempt is<br>succeeded" }
        %% a2@{ shape: text, label: "attempt is<br>failed" }
        %% a3@{ shape: text, label: "attempt created<br>for rollout" }
    end

    start["start"] -->|"enqueue_rollout()"| queuing("queuing")
    queuing -->|"dequeue_rollout()"| running
    start --->|"start_rollout()"| running
    requeuing("requeuing") -->|"dequeue_rollout()<br>or any attempt<br>unresponsive -> running"| running
    running -->|"latest attempt<br>is succeeded"| succeeded("succeeded")
    running -->|"all attempts<br>are failed and <br>no retry allowed"| failed("failed")
    running -->|"all attempts are failed<br>and <br>more retry allowed"| requeuing
    failed -->|"any attempt<br>unresponsive -> running"| running
    failed --> stop["stop"]
    succeeded --> stop

    linkStyle 0,1,2,3 stroke:green,stroke-width:4px,color:green;

    %% a3 -.-
    st -->|"attempt created<br>for rollout"| prep
    prep -->|"add_span()"| r
    r -->|"update_attempt()"| suc
    r -->|"update_attempt()"| f
    r -->|"unresponsive_seconds<br>exceeds"| u
    r -->|"timeout_seconds<br>exceeds"| t
    u -->|"add_span()"| r
    u -->|"attempt is<br>failed"| st3
    prep -->|"timeout_seconds<br>exceeds"| t
    suc -->|"attempt is<br>succeeded"| st2
    f -->|"attempt is<br>failed"| st3
    t -->|"attempt is<br>failed"| st3
    st@{ shape: sm-circ}
    st2@{ shape: framed-circle}
    st3@{ shape: framed-circle}
    %% st2 -.- a1
    %% st3 -.- a2
    start@{ shape: sm-circ}
    stop@{ shape: framed-circle}

    linkStyle 11,16 stroke:blue,stroke-width:4px,color:blue;
    linkStyle 12,13 stroke:violet,stroke-width:4px,color:violet;
    linkStyle 18,14,15 stroke:grey,stroke-width:4px,color:grey;
Loading

Testing

To keep tests consistent across different store implementations, the existing test fixture inmemory_store have been updated to support database-backed stores using SQLite files.

Known Issues

  • Currently only tested on SQLite. Further testing is needed for other databases like PostgreSQL, and perhaps MySQL.
  • Performance benchmarking is pending to evaluate the overhead introduced by database operations. More connection arguments such as pooling may need to be exposed in constructor.
  • Periodic cleanup of old rollouts/attempts/spans is not yet implemented.
  • (resolved) Some known status transition edge cases differ from existing store implementations, as known above.

Copilot AI review requested due to automatic review settings November 5, 2025 07:36
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds database backend support to the Lightning Store, enabling persistent storage of rollouts, attempts, and spans using SQLAlchemy with async support. The implementation includes SQLite support via aiosqlite, custom ORM models with optimistic concurrency control, retry mechanisms for handling database errors, and background task scheduling for health checks.

  • Implements DatabaseLightningStore with full async SQLAlchemy ORM integration
  • Adds configurable retry logic with AsyncTypeBasedRetry and RetryStrategy for handling database errors
  • Creates ORM models for rollouts, attempts, resources, and spans with proper type conversions
  • Adds test fixtures supporting both in-memory and database stores via environment variable

Reviewed Changes

Copilot reviewed 15 out of 16 changed files in this pull request and generated 24 comments.

Show a summary per file
File Description
pyproject.toml Added SQLAlchemy, aiosqlite, and tenacity dependencies; changed PyPI mirror URL
agentlightning/store/database/dbstore.py Core DatabaseLightningStore implementation with background task support
agentlightning/store/database/orm/*.py SQLAlchemy ORM models and custom type converters for database schema
agentlightning/store/database/retry_helper.py Configurable async retry decorator based on exception types
tests/store/conftest.py Updated test fixtures to support database store testing
agentlightning/store/init.py Exports DatabaseLightningStore
agentlightning/types/core.py Added blank line for formatting
.gitignore Added patterns for temporary and backup files

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

)

def __post_init__(self):
if self.status not in ["queuing", "running", "succeeded", "failed", "requeuing"]:
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The post_init validation is missing valid statuses 'preparing' and 'cancelled' which are defined in RolloutStatus (see types/core.py lines 97-105). This will cause validation errors for rollouts in these states.

Suggested change
if self.status not in ["queuing", "running", "succeeded", "failed", "requeuing"]:
# Validate against all valid statuses from RolloutStatus
valid_statuses = [e.value for e in RolloutStatus]
if self.status not in valid_statuses:

Copilot uses AI. Check for mistakes.
from sqlalchemy import Float, Integer, String, JSON
from sqlalchemy import update
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'AsyncSession' is not used.

Suggested change
from sqlalchemy.ext.asyncio import AsyncSession

Copilot uses AI. Check for mistakes.
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm.exc import StaleDataError
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'StaleDataError' is not used.

Suggested change
from sqlalchemy.orm.exc import StaleDataError

Copilot uses AI. Check for mistakes.
from sqlalchemy.orm.exc import StaleDataError
from typing import Any, Dict, Optional, List

import time
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'time' is not used.

Suggested change
import time

Copilot uses AI. Check for mistakes.
from agentlightning.types.tracer import Span, SpanContext, TraceStatus, Attributes, Event, Link, OtelResource, AttributeValue

from .base import SqlAlchemyBase, PydanticInDB, NamedDictBase, PydanticListInDB
from .rollout import RolloutInDB
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'RolloutInDB' is not used.

Suggested change
from .rollout import RolloutInDB

Copilot uses AI. Check for mistakes.

from .base import SqlAlchemyBase, PydanticInDB, NamedDictBase, PydanticListInDB
from .rollout import RolloutInDB
from .attempt import AttemptInDB
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'AttemptInDB' is not used.

Suggested change
from .attempt import AttemptInDB

Copilot uses AI. Check for mistakes.
@microsoft microsoft deleted a comment from Copilot AI Nov 5, 2025
…source management features

- Updated type hints to include `Union` for rollouts and attempted rollouts across multiple files.
- Improved `query_rollouts` and `get_rollout_by_id` methods to return `AttemptedRollout` where applicable.
- Added `query_resources` method for resource management in `DatabaseLightningStore`.
- Refactored `as_attempt` and `as_rollout` methods to utilize `model_dump` for better serialization. (Serialization cost is reduced)
- Updated tests to validate status transitions and ensure correct behavior with new rollout logic.
operations with SQLAlchemy.
- Refactored existing tests to utilize the new SqlLightningStore, ensuring compatibility with the new implementation.
- Adjusted timeout behavior in tests to align with the new polling intervals.
@mydmdm mydmdm requested a review from Copilot November 6, 2025 07:13
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 17 out of 18 changed files in this pull request and generated 13 comments.

Comments suppressed due to low confidence (4)

agentlightning/store/database/orm/base.py:1

  • Calling logging.basicConfig() in a library module can interfere with the application's logging configuration. Remove this line and let the application configure logging. Libraries should only create loggers, not configure the root logger.
# Copyright (c) Microsoft. All rights reserved.

agentlightning/store/database/orm/rollout.py:1

  • Corrected spelling of 'udpate' to 'update'.
# Copyright (c) Microsoft. All rights reserved.

agentlightning/store/database/orm/base.py:1

  • Missing space around the union operator in method signature at line 119. Should be 'Type[BaseException] | None' with spaces around the pipe character.
# Copyright (c) Microsoft. All rights reserved.

agentlightning/store/database/orm/rollout.py:84

  • This statement is unreachable.
        return Rollout(
            rollout_id=self.rollout_id,
            input=self.input,
            start_time=self.start_time,
            end_time=self.end_time,
            mode=self.mode, # type: ignore
            resources_id=self.resources_id,
            status=self.status, # type: ignore
            config=self.config if self.config is not None else RolloutConfig(),
            metadata=self.rollout_metadata if self.rollout_metadata is not None else {},
        )

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 74 to 84
return Rollout(
rollout_id=self.rollout_id,
input=self.input,
start_time=self.start_time,
end_time=self.end_time,
mode=self.mode, # type: ignore
resources_id=self.resources_id,
status=self.status, # type: ignore
config=self.config if self.config is not None else RolloutConfig(),
metadata=self.rollout_metadata if self.rollout_metadata is not None else {},
)
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function has two consecutive return statements. The second return statement (lines 74-84) is unreachable code. Remove lines 74-84 as they are dead code.

Suggested change
return Rollout(
rollout_id=self.rollout_id,
input=self.input,
start_time=self.start_time,
end_time=self.end_time,
mode=self.mode, # type: ignore
resources_id=self.resources_id,
status=self.status, # type: ignore
config=self.config if self.config is not None else RolloutConfig(),
metadata=self.rollout_metadata if self.rollout_metadata is not None else {},
)

Copilot uses AI. Check for mistakes.
if not hasattr(self, config.method):
raise ValueError(f"Periodic task method {config.method} is not defined in SqlLightningStore.")
if config.is_async:
func = lambda: asyncio.run(getattr(self, config.method)())
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using asyncio.run() inside a lambda that runs in a background thread can cause issues if the main event loop is already running. This creates a new event loop for each background task execution, which may lead to concurrency problems. Consider using asyncio.create_task() or run_coroutine_threadsafe() instead, or run the scheduler in its own thread with a dedicated event loop.

Copilot uses AI. Check for mistakes.
Comment on lines 27 to +33
"""Placeholder fixture for SQL store implementation. Returns None until SQL store is ready."""
return None
"""Helper generator to create a SqlLightningStore using a SQLite file for testing."""
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function has two consecutive docstrings. Only the first one will be recognized as the actual docstring. Remove line 32 as it's now outdated and line 33 contains the correct description.

Copilot uses AI. Check for mistakes.
"""
async with session_factory() as session:
async with session.begin():
conditions :list[Any] = []
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra space before colon in type annotation. Should be 'conditions: list[Any]' without space before the colon.

Suggested change
conditions :list[Any] = []
conditions: list[Any] = []

Copilot uses AI. Check for mistakes.
self,
database_url: Optional[str] = None,
*,
retry_for_waiting: Optional[dict[str, Any]|RetryStrategy] = None,
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space around the union operator. Should be 'dict[str, Any] | RetryStrategy' with spaces around the pipe character for consistency with PEP 8 style guidelines.

Suggested change
retry_for_waiting: Optional[dict[str, Any]|RetryStrategy] = None,
retry_for_waiting: Optional[dict[str, Any] | RetryStrategy] = None,

Copilot uses AI. Check for mistakes.
async def get_next_span_sequence_id(self, rollout_id: str, attempt_id: str) -> int:
return await SpanSeqIdInDB.get_next_sequence_id(self._async_session, rollout_id, attempt_id)

async def wait_for_rollouts(self, *, rollout_ids: List[str], timeout: Optional[float] = None) -> List[Rollout]:
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mixing implicit and explicit returns may indicate an error, as implicit returns always return None.

Copilot uses AI. Check for mistakes.
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.hybrid import hybrid_property
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'hybrid_property' is not used.

Suggested change
from sqlalchemy.ext.hybrid import hybrid_property

Copilot uses AI. Check for mistakes.
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.hybrid import hybrid_property

from sqlalchemy import select, and_, case
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'case' is not used.

Suggested change
from sqlalchemy import select, and_, case
from sqlalchemy import select, and_

Copilot uses AI. Check for mistakes.

from sqlalchemy import select, and_, case

from agentlightning.types import Rollout, RolloutConfig, RolloutStatus, AttemptStatus, AttemptedRollout
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'AttemptStatus' is not used.

Suggested change
from agentlightning.types import Rollout, RolloutConfig, RolloutStatus, AttemptStatus, AttemptedRollout
from agentlightning.types import Rollout, RolloutConfig, RolloutStatus, AttemptedRollout

Copilot uses AI. Check for mistakes.
Comment on lines +18 to +19
pass

Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary 'pass' statement.

Suggested change
pass

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants