From d579d2118b896dd1fb30765a575b9b534290198b Mon Sep 17 00:00:00 2001 From: amedeorizzo Date: Thu, 9 Oct 2025 12:17:08 +0200 Subject: [PATCH 1/8] feat: Implement Typesense-based session service --- pyproject.toml | 59 +- src/google/adk/sessions/__init__.py | 10 + src/google/adk/sessions/_session_util.py | 51 ++ .../adk/sessions/database_session_service.py | 39 +- .../adk/sessions/typesense_session_service.py | 601 ++++++++++++++++++ 5 files changed, 700 insertions(+), 60 deletions(-) create mode 100644 src/google/adk/sessions/typesense_session_service.py diff --git a/pyproject.toml b/pyproject.toml index 95c69f1fa7..338bac90a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,44 +25,45 @@ classifiers = [ # List of https://pypi.org/classifiers/ ] dependencies = [ # go/keep-sorted start - "PyYAML>=6.0.2, <7.0.0", # For APIHubToolset. - "absolufy-imports>=0.3.1, <1.0.0", # For Agent Engine deployment. - "anyio>=4.9.0, <5.0.0;python_version>='3.10'", # For MCP Session Manager - "authlib>=1.5.1, <2.0.0", # For RestAPI Tool - "click>=8.1.8, <9.0.0", # For CLI tools - "fastapi>=0.115.0, <1.0.0", # FastAPI framework - "google-api-python-client>=2.157.0, <3.0.0", # Google API client discovery - "google-cloud-aiplatform[agent_engines]>=1.112.0, <2.0.0",# For VertexAI integrations, e.g. example store. - "google-cloud-bigtable>=2.32.0", # For Bigtable database - "google-cloud-discoveryengine>=0.13.12, <0.14.0", # For Discovery Engine Search Tool - "google-cloud-secret-manager>=2.22.0, <3.0.0", # Fetching secrets in RestAPI Tool - "google-cloud-spanner>=3.56.0, <4.0.0", # For Spanner database - "google-cloud-speech>=2.30.0, <3.0.0", # For Audio Transcription - "google-cloud-storage>=2.18.0, <3.0.0", # For GCS Artifact service - "google-genai>=1.41.0, <2.0.0", # Google GenAI SDK - "graphviz>=0.20.2, <1.0.0", # Graphviz for graph rendering - "mcp>=1.8.0, <2.0.0;python_version>='3.10'", # For MCP Toolset - "opentelemetry-api>=1.37.0, <=1.37.0", # OpenTelemetry - limit upper version for sdk and api to not risk breaking changes from unstable _logs package. + "PyYAML>=6.0.2, <7.0.0", # For APIHubToolset. + "absolufy-imports>=0.3.1, <1.0.0", # For Agent Engine deployment. + "anyio>=4.9.0, <5.0.0;python_version>='3.10'", # For MCP Session Manager + "authlib>=1.5.1, <2.0.0", # For RestAPI Tool + "click>=8.1.8, <9.0.0", # For CLI tools + "fastapi>=0.115.0, <1.0.0", # FastAPI framework + "google-api-python-client>=2.157.0, <3.0.0", # Google API client discovery + "google-cloud-aiplatform[agent_engines]>=1.112.0, <2.0.0", # For VertexAI integrations, e.g. example store. + "google-cloud-bigtable>=2.32.0", # For Bigtable database + "google-cloud-discoveryengine>=0.13.12, <0.14.0", # For Discovery Engine Search Tool + "google-cloud-secret-manager>=2.22.0, <3.0.0", # Fetching secrets in RestAPI Tool + "google-cloud-spanner>=3.56.0, <4.0.0", # For Spanner database + "google-cloud-speech>=2.30.0, <3.0.0", # For Audio Transcription + "google-cloud-storage>=2.18.0, <3.0.0", # For GCS Artifact service + "google-genai>=1.41.0, <2.0.0", # Google GenAI SDK + "graphviz>=0.20.2, <1.0.0", # Graphviz for graph rendering + "mcp>=1.8.0, <2.0.0;python_version>='3.10'", # For MCP Toolset + "opentelemetry-api>=1.37.0, <=1.37.0", # OpenTelemetry - limit upper version for sdk and api to not risk breaking changes from unstable _logs package. "opentelemetry-exporter-gcp-logging>=1.9.0a0, <2.0.0", "opentelemetry-exporter-gcp-monitoring>=1.9.0a0, <2.0.0", "opentelemetry-exporter-gcp-trace>=1.9.0, <2.0.0", "opentelemetry-exporter-otlp-proto-http>=1.36.0", "opentelemetry-resourcedetector-gcp>=1.9.0a0, <2.0.0", "opentelemetry-sdk>=1.37.0, <=1.37.0", - "pydantic>=2.0, <3.0.0", # For data validation/models - "python-dateutil>=2.9.0.post0, <3.0.0", # For Vertext AI Session Service - "python-dotenv>=1.0.0, <2.0.0", # To manage environment variables + "pydantic>=2.0, <3.0.0", # For data validation/models + "python-dateutil>=2.9.0.post0, <3.0.0", # For Vertext AI Session Service + "python-dotenv>=1.0.0, <2.0.0", # To manage environment variables "requests>=2.32.4, <3.0.0", - "sqlalchemy-spanner>=1.14.0", # Spanner database session service - "sqlalchemy>=2.0, <3.0.0", # SQL database ORM - "starlette>=0.46.2, <1.0.0", # For FastAPI CLI - "tenacity>=8.0.0, <9.0.0", # For Retry management + "sqlalchemy-spanner>=1.14.0", # Spanner database session service + "sqlalchemy>=2.0, <3.0.0", # SQL database ORM + "starlette>=0.46.2, <1.0.0", # For FastAPI CLI + "tenacity>=8.0.0, <9.0.0", # For Retry management "typing-extensions>=4.5, <5", - "tzlocal>=5.3, <6.0", # Time zone utilities - "uvicorn>=0.34.0, <1.0.0", # ASGI server for FastAPI - "watchdog>=6.0.0, <7.0.0", # For file change detection and hot reload - "websockets>=15.0.1, <16.0.0", # For BaseLlmFlow + "tzlocal>=5.3, <6.0", # Time zone utilities + "uvicorn>=0.34.0, <1.0.0", # ASGI server for FastAPI + "watchdog>=6.0.0, <7.0.0", # For file change detection and hot reload + "websockets>=15.0.1, <16.0.0", # For BaseLlmFlow # go/keep-sorted end + "typesense>=1.1.1", ] dynamic = ["version"] diff --git a/src/google/adk/sessions/__init__.py b/src/google/adk/sessions/__init__.py index 5583ac4361..4b0d5bb45a 100644 --- a/src/google/adk/sessions/__init__.py +++ b/src/google/adk/sessions/__init__.py @@ -39,3 +39,13 @@ 'DatabaseSessionService require sqlalchemy>=2.0, please ensure it is' ' installed correctly.' ) + +try: + from .typesense_session_service import TypesenseSessionService + + __all__.append('TypesenseSessionService') +except ImportError: + logger.debug( + 'TypesenseSessionService requires typesense>=1.1.1, please ensure it is' + ' installed correctly.' + ) diff --git a/src/google/adk/sessions/_session_util.py b/src/google/adk/sessions/_session_util.py index 2cc65949cb..084b9d52dd 100644 --- a/src/google/adk/sessions/_session_util.py +++ b/src/google/adk/sessions/_session_util.py @@ -14,11 +14,14 @@ """Utility functions for session service.""" from __future__ import annotations +import copy from typing import Any from typing import Optional from google.genai import types +from .state import State + def decode_content( content: Optional[dict[str, Any]], @@ -36,3 +39,51 @@ def decode_grounding_metadata( if not grounding_metadata: return None return types.GroundingMetadata.model_validate(grounding_metadata) + + +def extract_state_delta( + state: dict[str, Any], +) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]: + """Extracts state deltas for app, user, and session scopes. + + Args: + state: The state dictionary containing mixed scopes. + + Returns: + A tuple of (app_state_delta, user_state_delta, session_state_delta). + """ + app_state_delta = {} + user_state_delta = {} + session_state_delta = {} + if state: + for key in state.keys(): + if key.startswith(State.APP_PREFIX): + app_state_delta[key.removeprefix(State.APP_PREFIX)] = state[key] + elif key.startswith(State.USER_PREFIX): + user_state_delta[key.removeprefix(State.USER_PREFIX)] = state[key] + elif not key.startswith(State.TEMP_PREFIX): + session_state_delta[key] = state[key] + return app_state_delta, user_state_delta, session_state_delta + + +def merge_state( + app_state: dict[str, Any], + user_state: dict[str, Any], + session_state: dict[str, Any], +) -> dict[str, Any]: + """Merges app, user, and session states into a single state dictionary. + + Args: + app_state: The app-level state. + user_state: The user-level state. + session_state: The session-level state. + + Returns: + A merged state dictionary with appropriate prefixes. + """ + merged_state = copy.deepcopy(session_state) + for key in app_state.keys(): + merged_state[State.APP_PREFIX + key] = app_state[key] + for key in user_state.keys(): + merged_state[State.USER_PREFIX + key] = user_state[key] + return merged_state diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 959524c689..622cdce112 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -53,6 +53,8 @@ from . import _session_util from ..events.event import Event +from ._session_util import extract_state_delta +from ._session_util import merge_state from .base_session_service import BaseSessionService from .base_session_service import GetSessionConfig from .base_session_service import ListSessionsResponse @@ -413,7 +415,7 @@ def __init__(self, db_url: str, **kwargs: Any): local_timezone = get_localzone() logger.info("Local timezone: %s", local_timezone) - self.db_engine: Engine = db_engine + self.db_engine = db_engine self.metadata: MetaData = MetaData() self.inspector = inspect(self.db_engine) @@ -463,7 +465,7 @@ async def create_session( sql_session.add(storage_user_state) # Extract state deltas - app_state_delta, user_state_delta, session_state = _extract_state_delta( + app_state_delta, user_state_delta, session_state = extract_state_delta( state ) @@ -490,7 +492,7 @@ async def create_session( sql_session.refresh(storage_session) # Merge states for response - merged_state = _merge_state(app_state, user_state, session_state) + merged_state = merge_state(app_state, user_state, session_state) session = storage_session.to_session(state=merged_state) return session @@ -545,7 +547,7 @@ async def get_session( session_state = storage_session.state # Merge states - merged_state = _merge_state(app_state, user_state, session_state) + merged_state = merge_state(app_state, user_state, session_state) # Convert storage session to session events = [e.to_event() for e in reversed(storage_events)] @@ -576,7 +578,7 @@ async def list_sessions( sessions = [] for storage_session in results: session_state = storage_session.state - merged_state = _merge_state(app_state, user_state, session_state) + merged_state = merge_state(app_state, user_state, session_state) sessions.append(storage_session.to_session(state=merged_state)) return ListSessionsResponse(sessions=sessions) @@ -633,7 +635,7 @@ async def append_event(self, session: Session, event: Event) -> Event: if event.actions: if event.actions.state_delta: app_state_delta, user_state_delta, session_state_delta = ( - _extract_state_delta(event.actions.state_delta) + extract_state_delta(event.actions.state_delta) ) # Merge state and update storage @@ -658,28 +660,3 @@ async def append_event(self, session: Session, event: Event) -> Event: # Also update the in-memory session await super().append_event(session=session, event=event) return event - - -def _extract_state_delta(state: dict[str, Any]): - app_state_delta = {} - user_state_delta = {} - session_state_delta = {} - if state: - for key in state.keys(): - if key.startswith(State.APP_PREFIX): - app_state_delta[key.removeprefix(State.APP_PREFIX)] = state[key] - elif key.startswith(State.USER_PREFIX): - user_state_delta[key.removeprefix(State.USER_PREFIX)] = state[key] - elif not key.startswith(State.TEMP_PREFIX): - session_state_delta[key] = state[key] - return app_state_delta, user_state_delta, session_state_delta - - -def _merge_state(app_state, user_state, session_state): - # Merge states for response - merged_state = copy.deepcopy(session_state) - for key in app_state.keys(): - merged_state[State.APP_PREFIX + key] = app_state[key] - for key in user_state.keys(): - merged_state[State.USER_PREFIX + key] = user_state[key] - return merged_state diff --git a/src/google/adk/sessions/typesense_session_service.py b/src/google/adk/sessions/typesense_session_service.py new file mode 100644 index 0000000000..641b3107a7 --- /dev/null +++ b/src/google/adk/sessions/typesense_session_service.py @@ -0,0 +1,601 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Typesense-based session service implementation.""" +from __future__ import annotations + +import base64 +from datetime import datetime +import logging +import pickle +from typing import Any +from typing import Optional +import uuid + +import typesense +from typing_extensions import override + +from . import _session_util +from ..events.event import Event +from ._session_util import extract_state_delta +from ._session_util import merge_state +from .base_session_service import BaseSessionService +from .base_session_service import GetSessionConfig +from .base_session_service import ListSessionsResponse +from .session import Session + +logger = logging.getLogger("google_adk." + __name__) + +# Typesense collection schemas +SESSIONS_SCHEMA = { + "name": "sessions", + "enable_nested_fields": True, + "fields": [ + {"name": "app_name", "type": "string"}, + {"name": "user_id", "type": "string"}, + {"name": "session_id", "type": "string"}, + {"name": "composite_key", "type": "string"}, # Unique document ID + {"name": "state", "type": "object"}, + {"name": "create_time", "type": "int64"}, # Unix timestamp microseconds + {"name": "update_time", "type": "int64"}, # Unix timestamp microseconds + ], +} + +EVENTS_SCHEMA = { + "name": "events", + "enable_nested_fields": True, + "fields": [ + {"name": "id", "type": "string"}, + {"name": "app_name", "type": "string"}, + {"name": "user_id", "type": "string"}, + {"name": "session_id", "type": "string"}, + {"name": "composite_key", "type": "string"}, # Unique document ID + {"name": "invocation_id", "type": "string"}, + {"name": "author", "type": "string"}, + {"name": "actions", "type": "string"}, # Pickled and base64 encoded + { + "name": "long_running_tool_ids_json", + "type": "string", + "optional": True, + }, + {"name": "branch", "type": "string", "optional": True}, + {"name": "timestamp", "type": "int64"}, # Unix timestamp microseconds + {"name": "content", "type": "object", "optional": True}, + {"name": "grounding_metadata", "type": "object", "optional": True}, + {"name": "custom_metadata", "type": "object", "optional": True}, + {"name": "partial", "type": "bool", "optional": True}, + {"name": "turn_complete", "type": "bool", "optional": True}, + {"name": "error_code", "type": "string", "optional": True}, + {"name": "error_message", "type": "string", "optional": True}, + {"name": "interrupted", "type": "bool", "optional": True}, + ], +} + +APP_STATES_SCHEMA = { + "name": "app_states", + "enable_nested_fields": True, + "fields": [ + {"name": "app_name", "type": "string"}, # Unique document ID + {"name": "state", "type": "object"}, + {"name": "update_time", "type": "int64"}, + ], +} + +USER_STATES_SCHEMA = { + "name": "user_states", + "enable_nested_fields": True, + "fields": [ + {"name": "app_name", "type": "string"}, + {"name": "user_id", "type": "string"}, + {"name": "composite_key", "type": "string"}, # Unique document ID + {"name": "state", "type": "object"}, + {"name": "update_time", "type": "int64"}, + ], +} + + +class TypesenseSessionService(BaseSessionService): + """A session service that uses Typesense for storage.""" + + def __init__( + self, + host: str, + api_key: str, + port: int = 8108, + protocol: str = "http", + **kwargs: Any, + ): + """Initializes the Typesense session service. + + Args: + host: The Typesense server hostname (e.g., "localhost" or + "api.typesense.cloud"). + api_key: The Typesense API key for authentication. + port: The Typesense server port. Defaults to 8108 for HTTP, use 443 for + HTTPS. + protocol: The protocol to use, either "http" or "https". Defaults to + "http". + **kwargs: Additional arguments (unused for Typesense). + + Raises: + ValueError: If the protocol is invalid or initialization fails. + """ + try: + # Validate protocol + if protocol not in ("http", "https"): + raise ValueError( + f"Invalid protocol '{protocol}'. Expected 'http' or 'https'." + ) + + self.client = typesense.Client({ + "nodes": [{"host": host, "port": port, "protocol": protocol}], + "api_key": api_key, + "connection_timeout_seconds": 2, + }) + + # Initialize collections + self._initialize_collections() + + logger.info( + "Typesense session service initialized with %s://%s:%s", + protocol, + host, + port, + ) + + except Exception as e: + raise ValueError( + f"Failed to initialize Typesense session service: {e}" + ) from e + + def _initialize_collections(self): + """Creates Typesense collections if they don't exist.""" + for schema in [ + SESSIONS_SCHEMA, + EVENTS_SCHEMA, + APP_STATES_SCHEMA, + USER_STATES_SCHEMA, + ]: + collection_name = schema["name"] + try: + self.client.collections[collection_name].retrieve() + logger.debug("Collection '%s' already exists", collection_name) + except typesense.exceptions.ObjectNotFound: + self.client.collections.create(schema) + logger.info("Created collection '%s'", collection_name) + + def _make_composite_key(self, *parts: str) -> str: + """Creates a composite key for Typesense document ID.""" + return ":".join(parts) + + def _to_microseconds(self, timestamp: float) -> int: + """Converts Unix timestamp to microseconds.""" + return int(timestamp * 1_000_000) + + def _from_microseconds(self, microseconds: int) -> float: + """Converts microseconds to Unix timestamp.""" + return microseconds / 1_000_000 + + def _serialize_actions(self, actions: Any) -> str: + """Pickles and base64 encodes actions.""" + return base64.b64encode(pickle.dumps(actions)).decode("ascii") + + def _deserialize_actions(self, actions_str: str) -> Any: + """Base64 decodes and unpickles actions.""" + return pickle.loads(base64.b64decode(actions_str)) + + def _get_app_state(self, app_name: str) -> dict[str, Any]: + """Fetches app state from Typesense.""" + try: + search_results = self.client.collections["app_states"].documents.search({ + "q": "*", + "filter_by": f"app_name:={app_name}", + "per_page": 1, + }) + if search_results["found"] > 0: + return search_results["hits"][0]["document"]["state"] + return {} + except typesense.exceptions.ObjectNotFound: + return {} + + def _get_user_state(self, app_name: str, user_id: str) -> dict[str, Any]: + """Fetches user state from Typesense.""" + try: + composite_key = self._make_composite_key(app_name, user_id) + search_results = self.client.collections["user_states"].documents.search({ + "q": "*", + "filter_by": f"composite_key:={composite_key}", + "per_page": 1, + }) + if search_results["found"] > 0: + return search_results["hits"][0]["document"]["state"] + return {} + except typesense.exceptions.ObjectNotFound: + return {} + + def _upsert_app_state(self, app_name: str, state: dict[str, Any]): + """Updates or inserts app state in Typesense.""" + now = self._to_microseconds(datetime.now().timestamp()) + document = { + "id": app_name, + "app_name": app_name, + "state": state, + "update_time": now, + } + self.client.collections["app_states"].documents.upsert(document) + + def _upsert_user_state( + self, app_name: str, user_id: str, state: dict[str, Any] + ): + """Updates or inserts user state in Typesense.""" + now = self._to_microseconds(datetime.now().timestamp()) + composite_key = self._make_composite_key(app_name, user_id) + document = { + "id": composite_key, + "app_name": app_name, + "user_id": user_id, + "composite_key": composite_key, + "state": state, + "update_time": now, + } + self.client.collections["user_states"].documents.upsert(document) + + @override + async def create_session( + self, + *, + app_name: str, + user_id: str, + state: Optional[dict[str, Any]] = None, + session_id: Optional[str] = None, + ) -> Session: + """Creates a new session.""" + # Fetch existing states + app_state = self._get_app_state(app_name) + user_state = self._get_user_state(app_name, user_id) + + # Extract state deltas + app_state_delta, user_state_delta, session_state = extract_state_delta( + state if state else {} + ) + + # Apply state deltas + app_state.update(app_state_delta) + user_state.update(user_state_delta) + + # Update app and user states if there are deltas + if app_state_delta: + self._upsert_app_state(app_name, app_state) + if user_state_delta: + self._upsert_user_state(app_name, user_id, user_state) + + # Generate session ID if not provided + if not session_id: + session_id = str(uuid.uuid4()) + + # Create session document + now = datetime.now().timestamp() + now_micro = self._to_microseconds(now) + composite_key = self._make_composite_key(app_name, user_id, session_id) + + session_doc = { + "id": composite_key, + "app_name": app_name, + "user_id": user_id, + "session_id": session_id, + "composite_key": composite_key, + "state": session_state, + "create_time": now_micro, + "update_time": now_micro, + } + + self.client.collections["sessions"].documents.create(session_doc) + + # Merge states for response + merged_state = merge_state(app_state, user_state, session_state) + + return Session( + app_name=app_name, + user_id=user_id, + id=session_id, + state=merged_state, + events=[], + last_update_time=now, + ) + + @override + async def get_session( + self, + *, + app_name: str, + user_id: str, + session_id: str, + config: Optional[GetSessionConfig] = None, + ) -> Optional[Session]: + """Gets a session.""" + # Search for session + composite_key = self._make_composite_key(app_name, user_id, session_id) + try: + search_results = self.client.collections["sessions"].documents.search({ + "q": "*", + "filter_by": f"composite_key:={composite_key}", + "per_page": 1, + }) + if search_results["found"] == 0: + return None + + session_doc = search_results["hits"][0]["document"] + except typesense.exceptions.ObjectNotFound: + return None + + # Build event filter + event_filter = ( + f"app_name:={app_name} && user_id:={user_id} &&" + f" session_id:={session_id}" + ) + if config and config.after_timestamp: + after_micro = self._to_microseconds(config.after_timestamp) + event_filter += f" && timestamp:>={after_micro}" + + # Search for events + event_search_params = { + "q": "*", + "filter_by": event_filter, + "sort_by": "timestamp:desc", + } + if config and config.num_recent_events: + event_search_params["per_page"] = config.num_recent_events + + event_results = self.client.collections["events"].documents.search( + event_search_params + ) + + # Convert events + events = [] + for hit in reversed(event_results["hits"]): + event_doc = hit["document"] + events.append(self._document_to_event(event_doc)) + + # Fetch states + app_state = self._get_app_state(app_name) + user_state = self._get_user_state(app_name, user_id) + session_state = session_doc["state"] + + # Merge states + merged_state = merge_state(app_state, user_state, session_state) + + return Session( + app_name=app_name, + user_id=user_id, + id=session_id, + state=merged_state, + events=events, + last_update_time=self._from_microseconds(session_doc["update_time"]), + ) + + @override + async def list_sessions( + self, *, app_name: str, user_id: str + ) -> ListSessionsResponse: + """Lists all sessions for a user.""" + # Search for sessions + search_results = self.client.collections["sessions"].documents.search({ + "q": "*", + "filter_by": f"app_name:={app_name} && user_id:={user_id}", + "per_page": 250, # Typesense default max + }) + + # Fetch states + app_state = self._get_app_state(app_name) + user_state = self._get_user_state(app_name, user_id) + + sessions = [] + for hit in search_results["hits"]: + session_doc = hit["document"] + session_state = session_doc["state"] + merged_state = merge_state(app_state, user_state, session_state) + + sessions.append( + Session( + app_name=app_name, + user_id=user_id, + id=session_doc["session_id"], + state=merged_state, + events=[], + last_update_time=self._from_microseconds( + session_doc["update_time"] + ), + ) + ) + + return ListSessionsResponse(sessions=sessions) + + @override + async def delete_session( + self, app_name: str, user_id: str, session_id: str + ) -> None: + """Deletes a session and all its events.""" + composite_key = self._make_composite_key(app_name, user_id, session_id) + + # Delete session + try: + self.client.collections["sessions"].documents[composite_key].delete() + except typesense.exceptions.ObjectNotFound: + pass # Session already deleted + + # Delete all events for this session + event_filter = ( + f"app_name:={app_name} && user_id:={user_id} &&" + f" session_id:={session_id}" + ) + try: + self.client.collections["events"].documents.delete( + {"filter_by": event_filter} + ) + except typesense.exceptions.ObjectNotFound: + pass # No events to delete + + @override + async def append_event(self, session: Session, event: Event) -> Event: + """Appends an event to a session.""" + if event.partial: + return event + + # Get current session to check staleness + composite_key = self._make_composite_key( + session.app_name, session.user_id, session.id + ) + try: + search_results = self.client.collections["sessions"].documents.search({ + "q": "*", + "filter_by": f"composite_key:={composite_key}", + "per_page": 1, + }) + if search_results["found"] == 0: + raise ValueError(f"Session {session.id} not found") + + session_doc = search_results["hits"][0]["document"] + stored_update_time = self._from_microseconds(session_doc["update_time"]) + + if stored_update_time > session.last_update_time: + raise ValueError( + "The last_update_time provided in the session object" + f" {datetime.fromtimestamp(session.last_update_time):%Y-%m-%d %H:%M:%S} is" + " earlier than the update_time in storage" + f" {datetime.fromtimestamp(stored_update_time):%Y-%m-%d %H:%M:%S}." + " Please check if it is a stale session." + ) + except typesense.exceptions.ObjectNotFound: + raise ValueError(f"Session {session.id} not found") + + # Fetch states + app_state = self._get_app_state(session.app_name) + user_state = self._get_user_state(session.app_name, session.user_id) + session_state = session_doc["state"] + + # Extract state delta from event + app_state_delta = {} + user_state_delta = {} + session_state_delta = {} + if event.actions and event.actions.state_delta: + app_state_delta, user_state_delta, session_state_delta = ( + extract_state_delta(event.actions.state_delta) + ) + + # Merge state and update storage + if app_state_delta: + app_state.update(app_state_delta) + self._upsert_app_state(session.app_name, app_state) + if user_state_delta: + user_state.update(user_state_delta) + self._upsert_user_state(session.app_name, session.user_id, user_state) + if session_state_delta: + session_state.update(session_state_delta) + + # Create event document + event_composite_key = self._make_composite_key( + event.id, session.app_name, session.user_id, session.id + ) + event_doc = self._event_to_document(session, event, event_composite_key) + self.client.collections["events"].documents.create(event_doc) + + # Update session's update_time and state + now = datetime.now().timestamp() + now_micro = self._to_microseconds(now) + session_doc["state"] = session_state + session_doc["update_time"] = now_micro + self.client.collections["sessions"].documents.upsert(session_doc) + + # Update session object + session.last_update_time = now + + # Also update the in-memory session + await super().append_event(session=session, event=event) + return event + + def _event_to_document( + self, session: Session, event: Event, composite_key: str + ) -> dict[str, Any]: + """Converts an Event object to a Typesense document.""" + doc = { + "id": composite_key, + "composite_key": composite_key, + "app_name": session.app_name, + "user_id": session.user_id, + "session_id": session.id, + "invocation_id": event.invocation_id, + "author": event.author, + "actions": self._serialize_actions(event.actions), + "timestamp": self._to_microseconds(event.timestamp), + } + + # Add optional fields + if event.long_running_tool_ids: + import json + + doc["long_running_tool_ids_json"] = json.dumps( + list(event.long_running_tool_ids) + ) + if event.branch: + doc["branch"] = event.branch + if event.content: + doc["content"] = event.content.model_dump(exclude_none=True, mode="json") + if event.grounding_metadata: + doc["grounding_metadata"] = event.grounding_metadata.model_dump( + exclude_none=True, mode="json" + ) + if event.custom_metadata: + doc["custom_metadata"] = event.custom_metadata + if event.partial is not None: + doc["partial"] = event.partial + if event.turn_complete is not None: + doc["turn_complete"] = event.turn_complete + if event.error_code: + doc["error_code"] = event.error_code + if event.error_message: + doc["error_message"] = event.error_message + if event.interrupted is not None: + doc["interrupted"] = event.interrupted + + return doc + + def _document_to_event(self, doc: dict[str, Any]) -> Event: + """Converts a Typesense document to an Event object.""" + import json + + long_running_tool_ids = set() + if ( + "long_running_tool_ids_json" in doc + and doc["long_running_tool_ids_json"] + ): + long_running_tool_ids = set(json.loads(doc["long_running_tool_ids_json"])) + + return Event( + id=doc["id"].split(":")[0], # Extract event ID from composite key + invocation_id=doc["invocation_id"], + author=doc["author"], + branch=doc.get("branch"), + actions=self._deserialize_actions(doc["actions"]), + timestamp=self._from_microseconds(doc["timestamp"]), + content=_session_util.decode_content(doc.get("content")), + long_running_tool_ids=long_running_tool_ids, + partial=doc.get("partial"), + turn_complete=doc.get("turn_complete"), + error_code=doc.get("error_code"), + error_message=doc.get("error_message"), + interrupted=doc.get("interrupted"), + grounding_metadata=_session_util.decode_grounding_metadata( + doc.get("grounding_metadata") + ), + custom_metadata=doc.get("custom_metadata"), + ) From 002f549509d7a96a871cf55f4d7a13b70a24dbd9 Mon Sep 17 00:00:00 2001 From: Amedeo Giuseppe Rizzo <44553685+amedeorizzo@users.noreply.github.com> Date: Thu, 9 Oct 2025 12:31:00 +0200 Subject: [PATCH 2/8] Update src/google/adk/sessions/typesense_session_service.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../adk/sessions/typesense_session_service.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/google/adk/sessions/typesense_session_service.py b/src/google/adk/sessions/typesense_session_service.py index 641b3107a7..d013b61865 100644 --- a/src/google/adk/sessions/typesense_session_service.py +++ b/src/google/adk/sessions/typesense_session_service.py @@ -186,13 +186,15 @@ def _from_microseconds(self, microseconds: int) -> float: """Converts microseconds to Unix timestamp.""" return microseconds / 1_000_000 - def _serialize_actions(self, actions: Any) -> str: - """Pickles and base64 encodes actions.""" - return base64.b64encode(pickle.dumps(actions)).decode("ascii") + def _serialize_actions(self, actions: Optional[EventActions]) -> Optional[str]: + """Serializes EventActions to a JSON string.""" + return actions.model_dump_json() if actions else None - def _deserialize_actions(self, actions_str: str) -> Any: - """Base64 decodes and unpickles actions.""" - return pickle.loads(base64.b64decode(actions_str)) + def _deserialize_actions(self, actions_str: Optional[str]) -> Optional[EventActions]: + """Deserializes EventActions from a JSON string.""" + if not actions_str: + return None + return EventActions.model_validate_json(actions_str) def _get_app_state(self, app_name: str) -> dict[str, Any]: """Fetches app state from Typesense.""" From e67a3f6b35b19532223a6244f7efd3058befbcbc Mon Sep 17 00:00:00 2001 From: Amedeo Giuseppe Rizzo <44553685+amedeorizzo@users.noreply.github.com> Date: Thu, 9 Oct 2025 12:31:14 +0200 Subject: [PATCH 3/8] Update src/google/adk/sessions/typesense_session_service.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/google/adk/sessions/typesense_session_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/google/adk/sessions/typesense_session_service.py b/src/google/adk/sessions/typesense_session_service.py index d013b61865..fd2f0fae9d 100644 --- a/src/google/adk/sessions/typesense_session_service.py +++ b/src/google/adk/sessions/typesense_session_service.py @@ -62,7 +62,7 @@ {"name": "composite_key", "type": "string"}, # Unique document ID {"name": "invocation_id", "type": "string"}, {"name": "author", "type": "string"}, - {"name": "actions", "type": "string"}, # Pickled and base64 encoded + {"name": "actions", "type": "string", "optional": True}, # Pickled and base64 encoded { "name": "long_running_tool_ids_json", "type": "string", From 35147d4faa22abdd9425194128c96bdd89da2040 Mon Sep 17 00:00:00 2001 From: Amedeo Giuseppe Rizzo <44553685+amedeorizzo@users.noreply.github.com> Date: Thu, 9 Oct 2025 12:31:40 +0200 Subject: [PATCH 4/8] Update src/google/adk/sessions/database_session_service.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/google/adk/sessions/database_session_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 622cdce112..f69a63d4fa 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -415,7 +415,7 @@ def __init__(self, db_url: str, **kwargs: Any): local_timezone = get_localzone() logger.info("Local timezone: %s", local_timezone) - self.db_engine = db_engine + self.db_engine: Engine = db_engine self.metadata: MetaData = MetaData() self.inspector = inspect(self.db_engine) From cced0f5b296bfa2f2b106ab58cda88138243e773 Mon Sep 17 00:00:00 2001 From: amedeorizzo Date: Thu, 9 Oct 2025 12:37:45 +0200 Subject: [PATCH 5/8] feat: Enhance Typesense session service with pagination and event ID handling --- .../adk/sessions/typesense_session_service.py | 90 ++++++++++++------- 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/src/google/adk/sessions/typesense_session_service.py b/src/google/adk/sessions/typesense_session_service.py index fd2f0fae9d..255c485ca7 100644 --- a/src/google/adk/sessions/typesense_session_service.py +++ b/src/google/adk/sessions/typesense_session_service.py @@ -27,6 +27,7 @@ from . import _session_util from ..events.event import Event +from ..events.event_actions import EventActions from ._session_util import extract_state_delta from ._session_util import merge_state from .base_session_service import BaseSessionService @@ -55,14 +56,18 @@ "name": "events", "enable_nested_fields": True, "fields": [ - {"name": "id", "type": "string"}, + {"name": "event_id", "type": "string"}, # Dedicated event ID field {"name": "app_name", "type": "string"}, {"name": "user_id", "type": "string"}, {"name": "session_id", "type": "string"}, {"name": "composite_key", "type": "string"}, # Unique document ID {"name": "invocation_id", "type": "string"}, {"name": "author", "type": "string"}, - {"name": "actions", "type": "string", "optional": True}, # Pickled and base64 encoded + { + "name": "actions", + "type": "string", + "optional": True, + }, # Pickled and base64 encoded { "name": "long_running_tool_ids_json", "type": "string", @@ -186,11 +191,15 @@ def _from_microseconds(self, microseconds: int) -> float: """Converts microseconds to Unix timestamp.""" return microseconds / 1_000_000 - def _serialize_actions(self, actions: Optional[EventActions]) -> Optional[str]: + def _serialize_actions( + self, actions: Optional[EventActions] + ) -> Optional[str]: """Serializes EventActions to a JSON string.""" return actions.model_dump_json() if actions else None - def _deserialize_actions(self, actions_str: Optional[str]) -> Optional[EventActions]: + def _deserialize_actions( + self, actions_str: Optional[str] + ) -> Optional[EventActions]: """Deserializes EventActions from a JSON string.""" if not actions_str: return None @@ -389,36 +398,52 @@ async def get_session( async def list_sessions( self, *, app_name: str, user_id: str ) -> ListSessionsResponse: - """Lists all sessions for a user.""" - # Search for sessions - search_results = self.client.collections["sessions"].documents.search({ - "q": "*", - "filter_by": f"app_name:={app_name} && user_id:={user_id}", - "per_page": 250, # Typesense default max - }) - - # Fetch states + """Lists all sessions for a user with pagination support.""" + # Fetch states once app_state = self._get_app_state(app_name) user_state = self._get_user_state(app_name, user_id) sessions = [] - for hit in search_results["hits"]: - session_doc = hit["document"] - session_state = session_doc["state"] - merged_state = merge_state(app_state, user_state, session_state) - - sessions.append( - Session( - app_name=app_name, - user_id=user_id, - id=session_doc["session_id"], - state=merged_state, - events=[], - last_update_time=self._from_microseconds( - session_doc["update_time"] - ), - ) - ) + page = 1 + per_page = 250 # Typesense max + + while True: + # Search for sessions with pagination + search_results = self.client.collections["sessions"].documents.search({ + "q": "*", + "filter_by": f"app_name:={app_name} && user_id:={user_id}", + "per_page": per_page, + "page": page, + }) + + # Break if no results found + if not search_results["hits"]: + break + + # Process results + for hit in search_results["hits"]: + session_doc = hit["document"] + session_state = session_doc["state"] + merged_state = merge_state(app_state, user_state, session_state) + + sessions.append( + Session( + app_name=app_name, + user_id=user_id, + id=session_doc["session_id"], + state=merged_state, + events=[], + last_update_time=self._from_microseconds( + session_doc["update_time"] + ), + ) + ) + + # Check if there are more pages + if len(search_results["hits"]) < per_page: + break + + page += 1 return ListSessionsResponse(sessions=sessions) @@ -531,6 +556,7 @@ def _event_to_document( """Converts an Event object to a Typesense document.""" doc = { "id": composite_key, + "event_id": event.id, # Store event ID explicitly "composite_key": composite_key, "app_name": session.app_name, "user_id": session.user_id, @@ -583,11 +609,11 @@ def _document_to_event(self, doc: dict[str, Any]) -> Event: long_running_tool_ids = set(json.loads(doc["long_running_tool_ids_json"])) return Event( - id=doc["id"].split(":")[0], # Extract event ID from composite key + id=doc["event_id"], # Read from dedicated event_id field invocation_id=doc["invocation_id"], author=doc["author"], branch=doc.get("branch"), - actions=self._deserialize_actions(doc["actions"]), + actions=self._deserialize_actions(doc.get("actions")), timestamp=self._from_microseconds(doc["timestamp"]), content=_session_util.decode_content(doc.get("content")), long_running_tool_ids=long_running_tool_ids, From a16fcc4c5242d8a60164e85052aaaca098ea9b14 Mon Sep 17 00:00:00 2001 From: amedeorizzo Date: Thu, 9 Oct 2025 12:39:12 +0200 Subject: [PATCH 6/8] refactor: Remove redundant json import in TypesenseSessionService --- src/google/adk/sessions/typesense_session_service.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/google/adk/sessions/typesense_session_service.py b/src/google/adk/sessions/typesense_session_service.py index 255c485ca7..facabf008c 100644 --- a/src/google/adk/sessions/typesense_session_service.py +++ b/src/google/adk/sessions/typesense_session_service.py @@ -21,6 +21,7 @@ from typing import Any from typing import Optional import uuid +import json import typesense from typing_extensions import override @@ -569,7 +570,6 @@ def _event_to_document( # Add optional fields if event.long_running_tool_ids: - import json doc["long_running_tool_ids_json"] = json.dumps( list(event.long_running_tool_ids) @@ -599,7 +599,6 @@ def _event_to_document( def _document_to_event(self, doc: dict[str, Any]) -> Event: """Converts a Typesense document to an Event object.""" - import json long_running_tool_ids = set() if ( From 65f28f996d2dd385ad194864de3c8ad275dfd13f Mon Sep 17 00:00:00 2001 From: amedeorizzo Date: Mon, 20 Oct 2025 10:30:16 +0200 Subject: [PATCH 7/8] feat: add migrate_to_typesense.py CLI and MIGRATION_GUIDE.md --- MIGRATION_GUIDE.md | 148 ++++++++++++ migrate_to_typesense.py | 520 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 668 insertions(+) create mode 100644 MIGRATION_GUIDE.md create mode 100755 migrate_to_typesense.py diff --git a/MIGRATION_GUIDE.md b/MIGRATION_GUIDE.md new file mode 100644 index 0000000000..c8d15b79b4 --- /dev/null +++ b/MIGRATION_GUIDE.md @@ -0,0 +1,148 @@ +# PostgreSQL to Typesense Migration Guide + +This guide explains how to migrate your existing session data from PostgreSQL (DatabaseSessionService) to Typesense (TypesenseSessionService). + +## Prerequisites + +1. **Install dependencies**: + ```bash + pip install sqlalchemy psycopg2-binary typesense + ``` + +2. **Typesense collections**: The migration script will automatically create missing collections with the correct schema. + + Alternatively, you can initialize TypesenseSessionService before migration: + ```python + from google.adk.sessions import TypesenseSessionService + + # This will create collections automatically + service = TypesenseSessionService( + host="localhost", + api_key="your_api_key", + port=8108, + protocol="http" + ) + ``` + +## Usage + +### Basic Migration + +```bash +python migrate_to_typesense.py \ + --db-url postgresql://user:password@localhost:5432/your_database \ + --typesense-host localhost \ + --typesense-api-key your_api_key \ + --typesense-port 8108 \ + --typesense-protocol http +``` + +### Dry Run (Preview What Will Be Migrated) + +Test the migration without actually moving data: + +```bash +python migrate_to_typesense.py \ + --db-url postgresql://user:password@localhost:5432/your_database \ + --typesense-host localhost \ + --typesense-api-key your_api_key \ + --dry-run +``` + +### Using Environment Variables + +```bash +# Export credentials +export DB_URL="postgresql://user:password@localhost:5432/your_database" +export TYPESENSE_HOST="localhost" +export TYPESENSE_API_KEY="your_secret_api_key" + +# Run migration +python migrate_to_typesense.py \ + --db-url "$DB_URL" \ + --typesense-host "$TYPESENSE_HOST" \ + --typesense-api-key "$TYPESENSE_API_KEY" +``` + +## What Gets Migrated + +The script migrates all rows from these PostgreSQL tables to Typesense collections: + +| PostgreSQL Table | → | Typesense Collection | Description | +|-----------------|---|---------------------|-------------| +| `storage_sessions` | → | `sessions` | Session metadata and state | +| `storage_events` | → | `events` | Events within sessions | +| `storage_app_states` | → | `app_states` | App-level state | +| `storage_user_states` | → | `user_states` | User-level state | + +## Migration Process + +The script performs these steps: + +1. **Connects to PostgreSQL** - Validates connection and tables exist +2. **Connects to Typesense** - Validates connection and collections exist +3. **Verifies Collections** - Checks that Typesense collections have correct schema (including `enable_nested_fields`) +4. **Migrates Data** in this order: + - App states (foundational data) + - User states (depends on apps) + - Sessions (depends on users) + - Events (depends on sessions) +5. **Reports Summary** - Shows counts and duration + +## Advanced Options + +### Automatic Collection Creation + +By default, the script automatically creates missing Typesense collections. To disable this: + +```bash +python migrate_to_typesense.py \ + --db-url "$DB_URL" \ + --typesense-host "$TYPESENSE_HOST" \ + --typesense-api-key "$TYPESENSE_API_KEY" \ + --no-auto-create +``` + +With `--no-auto-create`, the script will fail if collections don't exist. + +### Skip Collection Verification + +If you're confident collections exist and are correct: +```bash +python migrate_to_typesense.py \ + --db-url "$DB_URL" \ + --typesense-host "$TYPESENSE_HOST" \ + --typesense-api-key "$TYPESENSE_API_KEY" \ + --skip-verification +``` + +### Migrate Specific Tables Only + +Modify the script and comment out unwanted migrations in the `main()` function. + +## Example: Complete Migration Workflow + +```bash + +# 1. Test with dry run (collections will be auto-created during verification) +python migrate_to_typesense.py \ + --db-url postgresql://user:pass@localhost:5432/mydb \ + --typesense-host localhost \ + --typesense-api-key xyz \ + --dry-run + +# 2. Perform actual migration (collections already exist from dry run) +python migrate_to_typesense.py \ + --db-url postgresql://user:pass@localhost:5432/mydb \ + --typesense-host localhost \ + --typesense-api-key xyz + +# 3. Verify +python -c " +import typesense +client = typesense.Client({'nodes': [{'host': 'localhost', 'port': 8108, 'protocol': 'http'}], 'api_key': 'xyz'}) +for c in ['sessions', 'events', 'app_states', 'user_states']: + info = client.collections[c].retrieve() + print(f'{c}: {info[\"num_documents\"]} documents') +" +``` \ No newline at end of file diff --git a/migrate_to_typesense.py b/migrate_to_typesense.py new file mode 100755 index 0000000000..e976275373 --- /dev/null +++ b/migrate_to_typesense.py @@ -0,0 +1,520 @@ +#!/usr/bin/env python3 +""" +Migration script to move session data from PostgreSQL to Typesense. + +This script reads data from the PostgreSQL tables used by DatabaseSessionService +and migrates it to Typesense collections used by TypesenseSessionService. + +Features: +- Automatically creates missing Typesense collections with correct schema +- Dry-run mode to preview migration without moving data +- Handles all data type conversions (timestamps, actions, nested objects) +- Error handling and detailed progress reporting + +Usage: + python migrate_to_typesense.py --db-url postgresql://user:pass@host:5432/dbname \ + --typesense-host localhost \ + --typesense-api-key your_api_key \ + --typesense-port 8108 \ + --typesense-protocol http + + Optional flags: + --dry-run Preview what would be migrated + --no-auto-create Don't create missing collections automatically + --skip-verification Skip collection verification step +""" + +import argparse +import base64 +import pickle +import sys +from datetime import datetime +from typing import Any + +import typesense +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker + +# Typesense collection schemas (must match TypesenseSessionService) +SESSIONS_SCHEMA = { + "name": "sessions", + "enable_nested_fields": True, + "fields": [ + {"name": "app_name", "type": "string"}, + {"name": "user_id", "type": "string"}, + {"name": "session_id", "type": "string"}, + {"name": "composite_key", "type": "string"}, + {"name": "state", "type": "object"}, + {"name": "create_time", "type": "int64"}, + {"name": "update_time", "type": "int64"}, + ], +} + +EVENTS_SCHEMA = { + "name": "events", + "enable_nested_fields": True, + "fields": [ + {"name": "event_id", "type": "string"}, + {"name": "app_name", "type": "string"}, + {"name": "user_id", "type": "string"}, + {"name": "session_id", "type": "string"}, + {"name": "composite_key", "type": "string"}, + {"name": "invocation_id", "type": "string"}, + {"name": "author", "type": "string"}, + {"name": "actions", "type": "string", "optional": True}, + {"name": "long_running_tool_ids_json", "type": "string", "optional": True}, + {"name": "branch", "type": "string", "optional": True}, + {"name": "timestamp", "type": "int64"}, + {"name": "content", "type": "object", "optional": True}, + {"name": "grounding_metadata", "type": "object", "optional": True}, + {"name": "custom_metadata", "type": "object", "optional": True}, + {"name": "partial", "type": "bool", "optional": True}, + {"name": "turn_complete", "type": "bool", "optional": True}, + {"name": "error_code", "type": "string", "optional": True}, + {"name": "error_message", "type": "string", "optional": True}, + {"name": "interrupted", "type": "bool", "optional": True}, + ], +} + +APP_STATES_SCHEMA = { + "name": "app_states", + "enable_nested_fields": True, + "fields": [ + {"name": "app_name", "type": "string"}, + {"name": "state", "type": "object"}, + {"name": "update_time", "type": "int64"}, + ], +} + +USER_STATES_SCHEMA = { + "name": "user_states", + "enable_nested_fields": True, + "fields": [ + {"name": "app_name", "type": "string"}, + {"name": "user_id", "type": "string"}, + {"name": "composite_key", "type": "string"}, + {"name": "state", "type": "object"}, + {"name": "update_time", "type": "int64"}, + ], +} + + +def to_microseconds(timestamp) -> int: + """Convert timestamp to microseconds. + + Args: + timestamp: Either a datetime object or Unix timestamp (float) + + Returns: + Microseconds since Unix epoch as int64 + """ + if isinstance(timestamp, datetime): + # Convert datetime to Unix timestamp first + return int(timestamp.timestamp() * 1_000_000) + else: + # Assume it's already a Unix timestamp (float) + return int(timestamp * 1_000_000) + + +def make_composite_key(*parts: str) -> str: + """Create composite key for Typesense document ID.""" + return ":".join(parts) + + +def serialize_actions(actions) -> str: + """Base64 encode actions (handles dict, bytes, and memoryview).""" + if isinstance(actions, memoryview): + # PostgreSQL BYTEA returns memoryview, convert to bytes + return base64.b64encode(actions.tobytes()).decode("ascii") + elif isinstance(actions, bytes): + # Already pickled as bytes + return base64.b64encode(actions).decode("ascii") + else: + # Dict from JSON column, need to pickle first + return base64.b64encode(pickle.dumps(actions)).decode("ascii") + + +def migrate_sessions(pg_session, ts_client, dry_run=False): + """Migrate sessions from PostgreSQL to Typesense.""" + print("\n📦 Migrating sessions...") + + query = text(""" + SELECT app_name, user_id, id, state, create_time, update_time + FROM sessions + ORDER BY create_time + """) + + result = pg_session.execute(query) + sessions = result.fetchall() + + print(f"Found {len(sessions)} sessions to migrate") + + migrated = 0 + for row in sessions: + app_name, user_id, session_id, state, create_time, update_time = row + + composite_key = make_composite_key(app_name, user_id, session_id) + + doc = { + "id": composite_key, + "app_name": app_name, + "user_id": user_id, + "session_id": session_id, + "composite_key": composite_key, + "state": state or {}, + "create_time": to_microseconds(create_time), + "update_time": to_microseconds(update_time), + } + + if not dry_run: + try: + ts_client.collections["sessions"].documents.upsert(doc) + migrated += 1 + except Exception as e: + print(f"❌ Error migrating session {composite_key}: {e}") + else: + print(f" Would migrate: {composite_key}") + migrated += 1 + + print(f"✅ Migrated {migrated}/{len(sessions)} sessions") + return migrated + + +def migrate_events(pg_session, ts_client, dry_run=False): + """Migrate events from PostgreSQL to Typesense.""" + print("\n📦 Migrating events...") + + query = text(""" + SELECT id, app_name, user_id, session_id, invocation_id, author, + actions, long_running_tool_ids_json, branch, timestamp, + content, grounding_metadata, custom_metadata, partial, + turn_complete, error_code, error_message, interrupted + FROM events + ORDER BY timestamp + """) + + result = pg_session.execute(query) + events = result.fetchall() + + print(f"Found {len(events)} events to migrate") + + migrated = 0 + for row in events: + (event_id, app_name, user_id, session_id, invocation_id, author, + actions, long_running_tool_ids_json, branch, timestamp, + content, grounding_metadata, custom_metadata, partial, + turn_complete, error_code, error_message, interrupted) = row + + composite_key = make_composite_key(event_id, app_name, user_id, session_id) + + doc = { + "id": composite_key, + "event_id": event_id, + "composite_key": composite_key, + "app_name": app_name, + "user_id": user_id, + "session_id": session_id, + "invocation_id": invocation_id, + "author": author, + "timestamp": to_microseconds(timestamp), + } + + # Handle actions (already pickled in DB, need to base64 encode) + if actions: + doc["actions"] = serialize_actions(actions) + + # Add optional fields + if long_running_tool_ids_json: + doc["long_running_tool_ids_json"] = long_running_tool_ids_json + if branch: + doc["branch"] = branch + if content: + doc["content"] = content + if grounding_metadata: + doc["grounding_metadata"] = grounding_metadata + if custom_metadata: + doc["custom_metadata"] = custom_metadata + if partial is not None: + doc["partial"] = partial + if turn_complete is not None: + doc["turn_complete"] = turn_complete + if error_code: + doc["error_code"] = error_code + if error_message: + doc["error_message"] = error_message + if interrupted is not None: + doc["interrupted"] = interrupted + + if not dry_run: + try: + ts_client.collections["events"].documents.upsert(doc) + migrated += 1 + except Exception as e: + print(f"❌ Error migrating event {composite_key}: {e}") + else: + print(f" Would migrate: {composite_key}") + migrated += 1 + + print(f"✅ Migrated {migrated}/{len(events)} events") + return migrated + + +def migrate_app_states(pg_session, ts_client, dry_run=False): + """Migrate app states from PostgreSQL to Typesense.""" + print("\n📦 Migrating app states...") + + query = text(""" + SELECT app_name, state, update_time + FROM app_states + """) + + result = pg_session.execute(query) + app_states = result.fetchall() + + print(f"Found {len(app_states)} app states to migrate") + + migrated = 0 + for row in app_states: + app_name, state, update_time = row + + doc = { + "id": app_name, + "app_name": app_name, + "state": state or {}, + "update_time": to_microseconds(update_time), + } + + if not dry_run: + try: + ts_client.collections["app_states"].documents.upsert(doc) + migrated += 1 + except Exception as e: + print(f"❌ Error migrating app state {app_name}: {e}") + else: + print(f" Would migrate: {app_name}") + migrated += 1 + + print(f"✅ Migrated {migrated}/{len(app_states)} app states") + return migrated + + +def migrate_user_states(pg_session, ts_client, dry_run=False): + """Migrate user states from PostgreSQL to Typesense.""" + print("\n📦 Migrating user states...") + + query = text(""" + SELECT app_name, user_id, state, update_time + FROM user_states + """) + + result = pg_session.execute(query) + user_states = result.fetchall() + + print(f"Found {len(user_states)} user states to migrate") + + migrated = 0 + for row in user_states: + app_name, user_id, state, update_time = row + + composite_key = make_composite_key(app_name, user_id) + + doc = { + "id": composite_key, + "app_name": app_name, + "user_id": user_id, + "composite_key": composite_key, + "state": state or {}, + "update_time": to_microseconds(update_time), + } + + if not dry_run: + try: + ts_client.collections["user_states"].documents.upsert(doc) + migrated += 1 + except Exception as e: + print(f"❌ Error migrating user state {composite_key}: {e}") + else: + print(f" Would migrate: {composite_key}") + migrated += 1 + + print(f"✅ Migrated {migrated}/{len(user_states)} user states") + return migrated + + +def verify_collections(ts_client, auto_create=True): + """Verify that Typesense collections exist with correct schema. + + Args: + ts_client: Typesense client + auto_create: If True, automatically create missing collections + + Returns: + True if all collections exist or were created successfully + """ + print("\n🔍 Verifying Typesense collections...") + + schemas = { + "sessions": SESSIONS_SCHEMA, + "events": EVENTS_SCHEMA, + "app_states": APP_STATES_SCHEMA, + "user_states": USER_STATES_SCHEMA, + } + + for collection_name, schema in schemas.items(): + try: + collection = ts_client.collections[collection_name].retrieve() + print(f" ✅ Collection '{collection_name}' exists") + + # Check for enable_nested_fields + if not collection.get("enable_nested_fields"): + print(f" ⚠️ WARNING: '{collection_name}' doesn't have enable_nested_fields=true") + print(f" This may cause issues with nested objects") + print(f" Consider deleting and recreating: ts_client.collections['{collection_name}'].delete()") + except typesense.exceptions.ObjectNotFound: + if auto_create: + print(f" 📦 Collection '{collection_name}' not found, creating...") + try: + ts_client.collections.create(schema) + print(f" ✅ Collection '{collection_name}' created successfully") + except Exception as e: + print(f" ❌ Failed to create collection '{collection_name}': {e}") + return False + else: + print(f" ❌ Collection '{collection_name}' not found!") + print(f" Run with auto-create enabled or initialize TypesenseSessionService") + return False + + return True + + +def main(): + parser = argparse.ArgumentParser( + description="Migrate session data from PostgreSQL to Typesense" + ) + parser.add_argument( + "--db-url", + required=True, + help="PostgreSQL connection URL (e.g., postgresql://user:pass@host:5432/dbname)", + ) + parser.add_argument( + "--typesense-host", + required=True, + help="Typesense host (e.g., localhost)", + ) + parser.add_argument( + "--typesense-api-key", + required=True, + help="Typesense API key", + ) + parser.add_argument( + "--typesense-port", + type=int, + default=8108, + help="Typesense port (default: 8108)", + ) + parser.add_argument( + "--typesense-protocol", + choices=["http", "https"], + default="http", + help="Typesense protocol (default: http)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be migrated without actually migrating", + ) + parser.add_argument( + "--skip-verification", + action="store_true", + help="Skip Typesense collection verification", + ) + parser.add_argument( + "--no-auto-create", + action="store_true", + help="Don't automatically create missing Typesense collections", + ) + + args = parser.parse_args() + + print("=" * 60) + print("PostgreSQL to Typesense Migration") + print("=" * 60) + + if args.dry_run: + print("\n⚠️ DRY RUN MODE - No data will be migrated\n") + + # Connect to PostgreSQL + print(f"\n🔌 Connecting to PostgreSQL: {args.db_url.split('@')[1] if '@' in args.db_url else args.db_url}") + try: + engine = create_engine(args.db_url) + Session = sessionmaker(bind=engine) + pg_session = Session() + + # Test connection + pg_session.execute(text("SELECT 1")) + print("✅ PostgreSQL connection successful") + except Exception as e: + print(f"❌ Failed to connect to PostgreSQL: {e}") + return 1 + + # Connect to Typesense + print(f"\n🔌 Connecting to Typesense: {args.typesense_protocol}://{args.typesense_host}:{args.typesense_port}") + try: + ts_client = typesense.Client({ + "nodes": [{ + "host": args.typesense_host, + "port": args.typesense_port, + "protocol": args.typesense_protocol, + }], + "api_key": args.typesense_api_key, + "connection_timeout_seconds": 5, + }) + + # Test connection + ts_client.collections.retrieve() + print("✅ Typesense connection successful") + except Exception as e: + print(f"❌ Failed to connect to Typesense: {e}") + return 1 + + # Verify collections + if not args.skip_verification: + auto_create = not args.no_auto_create + if not verify_collections(ts_client, auto_create=auto_create): + print("\n❌ Collection verification failed. Please fix the issues above.") + return 1 + + # Perform migration + print("\n" + "=" * 60) + print("Starting Migration") + print("=" * 60) + + start_time = datetime.now() + + total_migrated = 0 + total_migrated += migrate_app_states(pg_session, ts_client, args.dry_run) + total_migrated += migrate_user_states(pg_session, ts_client, args.dry_run) + total_migrated += migrate_sessions(pg_session, ts_client, args.dry_run) + total_migrated += migrate_events(pg_session, ts_client, args.dry_run) + + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + print("\n" + "=" * 60) + print("Migration Summary") + print("=" * 60) + print(f"Total documents migrated: {total_migrated}") + print(f"Duration: {duration:.2f} seconds") + + if args.dry_run: + print("\n⚠️ This was a DRY RUN - no data was actually migrated") + print("Remove --dry-run flag to perform the actual migration") + else: + print("\n✅ Migration completed successfully!") + + pg_session.close() + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From 2e62d2f9a2a0ea016780025f3c2b432ec1ee185d Mon Sep 17 00:00:00 2001 From: amedeorizzo Date: Mon, 20 Oct 2025 10:43:00 +0200 Subject: [PATCH 8/8] fix: resolve merge conflicts and unify state extraction/merge in DB session service; remove duplicate _merge_state --- src/google/adk/sessions/_session_util.py | 9 +++ .../adk/sessions/database_session_service.py | 56 +------------------ 2 files changed, 12 insertions(+), 53 deletions(-) diff --git a/src/google/adk/sessions/_session_util.py b/src/google/adk/sessions/_session_util.py index 54ea7eafd7..14272807c2 100644 --- a/src/google/adk/sessions/_session_util.py +++ b/src/google/adk/sessions/_session_util.py @@ -48,3 +48,12 @@ def extract_state_delta( elif not key.startswith(State.TEMP_PREFIX): deltas["session"][key] = state[key] return deltas + +def _merge_state(app_state, user_state, session_state): + # Merge states for response + merged_state = copy.deepcopy(session_state) + for key in app_state.keys(): + merged_state[State.APP_PREFIX + key] = app_state[key] + for key in user_state.keys(): + merged_state[State.USER_PREFIX + key] = user_state[key] + return merged_state \ No newline at end of file diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 9b965059e1..203a2ee965 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -482,16 +482,10 @@ async def create_session( sql_session.add(storage_user_state) # Extract state deltas -<<<<<<< HEAD - app_state_delta, user_state_delta, session_state = extract_state_delta( - state - ) -======= - state_deltas = _session_util.extract_state_delta(state) + state_deltas = extract_state_delta(state) app_state_delta = state_deltas["app"] user_state_delta = state_deltas["user"] session_state = state_deltas["session"] ->>>>>>> upstream/main # Apply state delta if app_state_delta: @@ -512,13 +506,9 @@ async def create_session( sql_session.refresh(storage_session) # Merge states for response -<<<<<<< HEAD - merged_state = merge_state(app_state, user_state, session_state) -======= - merged_state = _merge_state( + merged_state = merge_state( storage_app_state.state, storage_user_state.state, session_state ) ->>>>>>> upstream/main session = storage_session.to_session(state=merged_state) return session @@ -615,13 +605,8 @@ async def list_sessions( sessions = [] for storage_session in results: session_state = storage_session.state -<<<<<<< HEAD - merged_state = merge_state(app_state, user_state, session_state) - -======= user_state = user_states_map.get(storage_session.user_id, {}) - merged_state = _merge_state(app_state, user_state, session_state) ->>>>>>> upstream/main + merged_state = merge_state(app_state, user_state, session_state) sessions.append(storage_session.to_session(state=merged_state)) return ListSessionsResponse(sessions=sessions) @@ -670,27 +655,6 @@ async def append_event(self, session: Session, event: Event) -> Event: ) # Extract state delta -<<<<<<< HEAD - app_state_delta = {} - user_state_delta = {} - session_state_delta = {} - if event.actions: - if event.actions.state_delta: - app_state_delta, user_state_delta, session_state_delta = ( - extract_state_delta(event.actions.state_delta) - ) - - # Merge state and update storage - if app_state_delta: - app_state.update(app_state_delta) - storage_app_state.state = app_state - if user_state_delta: - user_state.update(user_state_delta) - storage_user_state.state = user_state - if session_state_delta: - session_state.update(session_state_delta) - storage_session.state = session_state -======= if event.actions and event.actions.state_delta: state_deltas = _session_util.extract_state_delta( event.actions.state_delta @@ -705,7 +669,6 @@ async def append_event(self, session: Session, event: Event) -> Event: storage_user_state.state = storage_user_state.state | user_state_delta if session_state_delta: storage_session.state = storage_session.state | session_state_delta ->>>>>>> upstream/main sql_session.add(StorageEvent.from_event(session, event)) @@ -718,16 +681,3 @@ async def append_event(self, session: Session, event: Event) -> Event: # Also update the in-memory session await super().append_event(session=session, event=event) return event -<<<<<<< HEAD -======= - - -def _merge_state(app_state, user_state, session_state): - # Merge states for response - merged_state = copy.deepcopy(session_state) - for key in app_state.keys(): - merged_state[State.APP_PREFIX + key] = app_state[key] - for key in user_state.keys(): - merged_state[State.USER_PREFIX + key] = user_state[key] - return merged_state ->>>>>>> upstream/main