Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/source/providers/agents/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

Agents API for creating and interacting with agentic systems.

Main functionalities provided by this API:
- Create agents with specific instructions and ability to use tools.
- Interactions with agents are grouped into sessions ("threads"), and each interaction is called a "turn".
- Agents can be provided with various tools (see the ToolGroups and ToolRuntime APIs for more details).
- Agents can be provided with various shields (see the Safety API for more details).
- Agents can also use Memory to retrieve information from knowledge bases. See the RAG Tool and Vector IO APIs for more details.
Main functionalities provided by this API:
- Create agents with specific instructions and ability to use tools.
- Interactions with agents are grouped into sessions ("threads"), and each interaction is called a "turn".
- Agents can be provided with various tools (see the ToolGroups and ToolRuntime APIs for more details).
- Agents can be provided with various shields (see the Safety API for more details).
- Agents can also use Memory to retrieve information from knowledge bases. See the RAG Tool and Vector IO APIs for more details.

This section contains documentation for all available providers for the **agents** API.

Expand Down
6 changes: 3 additions & 3 deletions docs/source/providers/inference/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

Llama Stack Inference API for generating completions, chat completions, and embeddings.

This API provides the raw interface to the underlying models. Two kinds of models are supported:
- LLM models: these models generate "raw" and "chat" (conversational) completions.
- Embedding models: these models generate embeddings to be used for semantic search.
This API provides the raw interface to the underlying models. Two kinds of models are supported:
- LLM models: these models generate "raw" and "chat" (conversational) completions.
- Embedding models: these models generate embeddings to be used for semantic search.

This section contains documentation for all available providers for the **inference** API.

Expand Down
85 changes: 83 additions & 2 deletions llama_stack/providers/remote/vector_io/chroma/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
import heapq
import json
from typing import Any
from urllib.parse import urlparse
Expand All @@ -30,6 +31,7 @@
EmbeddingIndex,
VectorDBWithIndex,
)
from llama_stack.providers.utils.vector_io.vector_utils import Reranker

from .config import ChromaVectorIOConfig as RemoteChromaVectorIOConfig

Expand Down Expand Up @@ -114,7 +116,38 @@ async def query_keyword(
k: int,
score_threshold: float,
) -> QueryChunksResponse:
raise NotImplementedError("Keyword search is not supported in Chroma")
results = await maybe_await(
self.collection.query(
query_texts=[query_string],
where_document={"$contains": query_string},
n_results=k,
include=["documents", "distances"],
)
)

distances = results["distances"][0] if results["distances"] else []
documents = results["documents"][0] if results["documents"] else []

chunks = []
scores = []

for dist, doc in zip(distances, documents, strict=False):
try:
doc_data = json.loads(doc)
chunk = Chunk(**doc_data)
except Exception:
log.exception(f"Failed to parse document: {doc}")
continue

score = 1.0 / (1.0 + float(dist)) if dist is not None else 1.0

if score < score_threshold:
continue

chunks.append(chunk)
scores.append(score)

return QueryChunksResponse(chunks=chunks, scores=scores)

async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete a single chunk from the Chroma collection by its ID."""
Expand All @@ -130,7 +163,55 @@ async def query_hybrid(
reranker_type: str,
reranker_params: dict[str, Any] | None = None,
) -> QueryChunksResponse:
raise NotImplementedError("Hybrid search is not supported in Chroma")
"""
Hybrid search combining vector similarity and keyword search using configurable reranking.
Args:
embedding: The query embedding vector
query_string: The text query for keyword search
k: Number of results to return
score_threshold: Minimum similarity score threshold
reranker_type: Type of reranker to use ("rrf" or "weighted")
reranker_params: Parameters for the reranker
Returns:
QueryChunksResponse with combined results
"""
if reranker_params is None:
reranker_params = {}

# Get results from both search methods
vector_response = await self.query_vector(embedding, k, score_threshold)
keyword_response = await self.query_keyword(query_string, k, score_threshold)

# Convert responses to score dictionaries using chunk_id
vector_scores = {
chunk.chunk_id: score for chunk, score in zip(vector_response.chunks, vector_response.scores, strict=False)
}
keyword_scores = {
chunk.chunk_id: score
for chunk, score in zip(keyword_response.chunks, keyword_response.scores, strict=False)
}

# Combine scores using the reranking utility
combined_scores = Reranker.combine_search_results(vector_scores, keyword_scores, reranker_type, reranker_params)

# Efficient top-k selection because it only tracks the k best candidates it's seen so far
top_k_items = heapq.nlargest(k, combined_scores.items(), key=lambda x: x[1])

# Filter by score threshold
filtered_items = [(doc_id, score) for doc_id, score in top_k_items if score >= score_threshold]

# Create a map of chunk_id to chunk for both responses
chunk_map = {c.chunk_id: c for c in vector_response.chunks + keyword_response.chunks}

# Use the map to look up chunks by their IDs
chunks = []
scores = []
for doc_id, score in filtered_items:
if doc_id in chunk_map:
chunks.append(chunk_map[doc_id])
scores.append(score)

return QueryChunksResponse(chunks=chunks, scores=scores)


class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
Expand Down
111 changes: 111 additions & 0 deletions llama_stack/providers/utils/vector_io/vector_utils.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rebase your PR to exclude llama_stack/providers/utils/vector_io/vector_utils.py from your PR. Rebase from main since #3064 was merged

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,114 @@ def sanitize_collection_name(name: str, weaviate_format=False) -> str:
else:
s = proper_case(re.sub(r"[^a-zA-Z0-9]", "", name))
return s


class Reranker:
@staticmethod
def _normalize_scores(scores: dict[str, float]) -> dict[str, float]:
"""
Normalize scores to 0-1 range using min-max normalization.
Args:
scores: dictionary of scores with document IDs as keys and scores as values
Returns:
Normalized scores with document IDs as keys and normalized scores as values
"""
if not scores:
return {}
min_score, max_score = min(scores.values()), max(scores.values())
score_range = max_score - min_score
if score_range > 0:
return {doc_id: (score - min_score) / score_range for doc_id, score in scores.items()}
return dict.fromkeys(scores, 1.0)

@staticmethod
def weighted_rerank(
vector_scores: dict[str, float],
keyword_scores: dict[str, float],
alpha: float = 0.5,
) -> dict[str, float]:
"""
Rerank via weighted average of scores.
Args:
vector_scores: scores from vector search
keyword_scores: scores from keyword search
alpha: weight factor between 0 and 1 (default: 0.5)
0 = keyword only, 1 = vector only, 0.5 = equal weight
Returns:
All unique document IDs with weighted combined scores
"""
all_ids = set(vector_scores.keys()) | set(keyword_scores.keys())
normalized_vector_scores = Reranker._normalize_scores(vector_scores)
normalized_keyword_scores = Reranker._normalize_scores(keyword_scores)

# Weighted formula: score = (1-alpha) * keyword_score + alpha * vector_score
# alpha=0 means keyword only, alpha=1 means vector only
return {
doc_id: ((1 - alpha) * normalized_keyword_scores.get(doc_id, 0.0))
+ (alpha * normalized_vector_scores.get(doc_id, 0.0))
for doc_id in all_ids
}

@staticmethod
def rrf_rerank(
vector_scores: dict[str, float],
keyword_scores: dict[str, float],
impact_factor: float = 60.0,
) -> dict[str, float]:
"""
Rerank via Reciprocal Rank Fusion.
Args:
vector_scores: scores from vector search
keyword_scores: scores from keyword search
impact_factor: impact factor for RRF (default: 60.0)
Returns:
All unique document IDs with RRF combined scores
"""

# Convert scores to ranks
vector_ranks = {
doc_id: i + 1
for i, (doc_id, _) in enumerate(sorted(vector_scores.items(), key=lambda x: x[1], reverse=True))
}
keyword_ranks = {
doc_id: i + 1
for i, (doc_id, _) in enumerate(sorted(keyword_scores.items(), key=lambda x: x[1], reverse=True))
}

all_ids = set(vector_scores.keys()) | set(keyword_scores.keys())
rrf_scores = {}
for doc_id in all_ids:
vector_rank = vector_ranks.get(doc_id, float("inf"))
keyword_rank = keyword_ranks.get(doc_id, float("inf"))

# RRF formula: score = 1/(k + r) where k is impact_factor (default: 60.0) and r is the rank
rrf_scores[doc_id] = (1.0 / (impact_factor + vector_rank)) + (1.0 / (impact_factor + keyword_rank))
return rrf_scores

@staticmethod
def combine_search_results(
vector_scores: dict[str, float],
keyword_scores: dict[str, float],
reranker_type: str = "rrf",
reranker_params: dict[str, float] | None = None,
) -> dict[str, float]:
"""
Combine vector and keyword search results using specified reranking strategy.
Args:
vector_scores: scores from vector search
keyword_scores: scores from keyword search
reranker_type: type of reranker to use (default: RERANKER_TYPE_RRF)
reranker_params: parameters for the reranker
Returns:
All unique document IDs with combined scores
"""
if reranker_params is None:
reranker_params = {}

if reranker_type == "weighted":
alpha = reranker_params.get("alpha", 0.5)
return Reranker.weighted_rerank(vector_scores, keyword_scores, alpha)
else:
# Default to RRF for None, RRF, or any unknown types
impact_factor = reranker_params.get("impact_factor", 60.0)
return Reranker.rrf_rerank(vector_scores, keyword_scores, impact_factor)
124 changes: 124 additions & 0 deletions tests/unit/providers/vector_io/remote/test_chroma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in #3264, I don't think we should have remote provider unit tests. I think we should leave the testing for remote providers to our integrations tests.

#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

import json
from unittest.mock import MagicMock, patch

import numpy as np
import pytest

from llama_stack.apis.vector_io import QueryChunksResponse

# Mock the entire chromadb module
chromadb_mock = MagicMock()
chromadb_mock.AsyncHttpClient = MagicMock
chromadb_mock.PersistentClient = MagicMock

# Apply the mock before importing ChromaIndex
with patch.dict("sys.modules", {"chromadb": chromadb_mock}):
from llama_stack.providers.remote.vector_io.chroma.chroma import ChromaIndex

# This test is a unit test for the ChromaVectorIOAdapter class. This should only contain
# tests which are specific to this class. More general (API-level) tests should be placed in
# tests/integration/vector_io/
#
# How to run this test:
#
# pytest tests/unit/providers/vector_io/test_chroma.py \
# -v -s --tb=short --disable-warnings --asyncio-mode=auto

CHROMA_PROVIDER = "chromadb"


@pytest.fixture
async def mock_chroma_collection() -> MagicMock:
"""Create a mock Chroma collection with common method behaviors."""
collection = MagicMock()
collection.name = "test_collection"

# Mock add operation
collection.add.return_value = None

# Mock query operation for vector search
collection.query.return_value = {
"distances": [[0.1, 0.2]],
"documents": [
[
json.dumps({"content": "mock chunk 1", "metadata": {"document_id": "doc1"}}),
json.dumps({"content": "mock chunk 2", "metadata": {"document_id": "doc2"}}),
]
],
}

# Mock delete operation
collection.delete.return_value = None

return collection


@pytest.fixture
async def mock_chroma_client(mock_chroma_collection):
"""Create a mock Chroma client with common method behaviors."""
client = MagicMock()

# Mock collection operations
client.get_or_create_collection.return_value = mock_chroma_collection
client.get_collection.return_value = mock_chroma_collection
client.delete_collection.return_value = None

return client


@pytest.fixture
async def chroma_index(mock_chroma_client, mock_chroma_collection):
"""Create a ChromaIndex with mocked client and collection."""
index = ChromaIndex(client=mock_chroma_client, collection=mock_chroma_collection)
yield index
# No real cleanup needed since we're using mocks


async def test_add_chunks(chroma_index, sample_chunks, sample_embeddings, mock_chroma_collection):
await chroma_index.add_chunks(sample_chunks, sample_embeddings)

# Verify data was inserted
mock_chroma_collection.add.assert_called_once()

# Verify the add call had the right number of chunks
add_call = mock_chroma_collection.add.call_args
assert len(add_call[1]["documents"]) == len(sample_chunks)


async def test_query_chunks_vector(
chroma_index, sample_chunks, sample_embeddings, embedding_dimension, mock_chroma_collection
):
# Setup: Add chunks first
await chroma_index.add_chunks(sample_chunks, sample_embeddings)

# Test vector search
query_embedding = np.random.rand(embedding_dimension).astype(np.float32)
response = await chroma_index.query_vector(query_embedding, k=2, score_threshold=0.0)

assert isinstance(response, QueryChunksResponse)
assert len(response.chunks) == 2
mock_chroma_collection.query.assert_called_once()


async def test_query_chunks_keyword_search(chroma_index, sample_chunks, sample_embeddings, mock_chroma_collection):
await chroma_index.add_chunks(sample_chunks, sample_embeddings)

# Test keyword search
query_string = "Sentence 5"
response = await chroma_index.query_keyword(query_string=query_string, k=2, score_threshold=0.0)

assert isinstance(response, QueryChunksResponse)
assert len(response.chunks) == 2


async def test_delete_collection(chroma_index, mock_chroma_client):
# Test collection deletion
await chroma_index.delete()

mock_chroma_client.delete_collection.assert_called_once_with(chroma_index.collection.name)