From 010cbfe7cf6c2df08416c3bccb8d48a8873f70dc Mon Sep 17 00:00:00 2001 From: tellet-q Date: Fri, 31 Oct 2025 11:50:22 +0100 Subject: [PATCH 1/6] Add httpx engine for qdrant --- engine/clients/client_factory.py | 8 + engine/clients/qdrant_native/__init__.py | 9 + engine/clients/qdrant_native/config.py | 4 + engine/clients/qdrant_native/configure.py | 136 +++++++++++ engine/clients/qdrant_native/parser.py | 70 ++++++ engine/clients/qdrant_native/search.py | 103 +++++++++ engine/clients/qdrant_native/upload.py | 137 +++++++++++ .../qdrant-native-single-node.json | 217 ++++++++++++++++++ 8 files changed, 684 insertions(+) create mode 100644 engine/clients/qdrant_native/__init__.py create mode 100644 engine/clients/qdrant_native/config.py create mode 100644 engine/clients/qdrant_native/configure.py create mode 100644 engine/clients/qdrant_native/parser.py create mode 100644 engine/clients/qdrant_native/search.py create mode 100644 engine/clients/qdrant_native/upload.py create mode 100644 experiments/configurations/qdrant-native-single-node.json diff --git a/engine/clients/client_factory.py b/engine/clients/client_factory.py index a74df2ab4..5905f2455 100644 --- a/engine/clients/client_factory.py +++ b/engine/clients/client_factory.py @@ -24,6 +24,11 @@ PgVectorUploader, ) from engine.clients.qdrant import QdrantConfigurator, QdrantSearcher, QdrantUploader +from engine.clients.qdrant_native import ( + QdrantNativeConfigurator, + QdrantNativeSearcher, + QdrantNativeUploader, +) from engine.clients.redis import RedisConfigurator, RedisSearcher, RedisUploader from engine.clients.weaviate import ( WeaviateConfigurator, @@ -33,6 +38,7 @@ ENGINE_CONFIGURATORS = { "qdrant": QdrantConfigurator, + "qdrant_native": QdrantNativeConfigurator, "weaviate": WeaviateConfigurator, "milvus": MilvusConfigurator, "elasticsearch": ElasticConfigurator, @@ -43,6 +49,7 @@ ENGINE_UPLOADERS = { "qdrant": QdrantUploader, + "qdrant_native": QdrantNativeUploader, "weaviate": WeaviateUploader, "milvus": MilvusUploader, "elasticsearch": ElasticUploader, @@ -53,6 +60,7 @@ ENGINE_SEARCHERS = { "qdrant": QdrantSearcher, + "qdrant_native": QdrantNativeSearcher, "weaviate": WeaviateSearcher, "milvus": MilvusSearcher, "elasticsearch": ElasticSearcher, diff --git a/engine/clients/qdrant_native/__init__.py b/engine/clients/qdrant_native/__init__.py new file mode 100644 index 000000000..1a5c1ddea --- /dev/null +++ b/engine/clients/qdrant_native/__init__.py @@ -0,0 +1,9 @@ +from .configure import QdrantNativeConfigurator +from .search import QdrantNativeSearcher +from .upload import QdrantNativeUploader + +__all__ = [ + "QdrantNativeConfigurator", + "QdrantNativeUploader", + "QdrantNativeSearcher", +] \ No newline at end of file diff --git a/engine/clients/qdrant_native/config.py b/engine/clients/qdrant_native/config.py new file mode 100644 index 000000000..8f9ea6991 --- /dev/null +++ b/engine/clients/qdrant_native/config.py @@ -0,0 +1,4 @@ +import os + +QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME", "benchmark") +QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", None) \ No newline at end of file diff --git a/engine/clients/qdrant_native/configure.py b/engine/clients/qdrant_native/configure.py new file mode 100644 index 000000000..20cf09baa --- /dev/null +++ b/engine/clients/qdrant_native/configure.py @@ -0,0 +1,136 @@ +import httpx + +from benchmark.dataset import Dataset +from engine.base_client.configure import BaseConfigurator +from engine.base_client.distances import Distance +from engine.clients.qdrant_native.config import QDRANT_API_KEY, QDRANT_COLLECTION_NAME + + +class QdrantNativeConfigurator(BaseConfigurator): + SPARSE_VECTOR_SUPPORT = True + DISTANCE_MAPPING = { + Distance.L2: "Euclid", + Distance.COSINE: "Cosine", + Distance.DOT: "Dot", + } + INDEX_TYPE_MAPPING = { + "int": "integer", + "keyword": "keyword", + "text": "text", + "float": "float", + "geo": "geo", + } + + def __init__(self, host, collection_params: dict, connection_params: dict): + super().__init__(host, collection_params, connection_params) + + self.host = host.rstrip('/') + self.connection_params = connection_params + + # Build headers + self.headers = {"Content-Type": "application/json"} + if QDRANT_API_KEY: + self.headers["api-key"] = QDRANT_API_KEY + + # Create HTTP client + timeout = connection_params.get("timeout", 30) + self.client = httpx.Client( + headers=self.headers, + timeout=httpx.Timeout(timeout=timeout), + ) + + def clean(self): + """Delete the collection""" + url = f"{self.host}/collections/{QDRANT_COLLECTION_NAME}" + response = self.client.delete(url) + # 404 is ok if collection doesn't exist + if response.status_code not in [200, 404]: + response.raise_for_status() + + def recreate(self, dataset: Dataset, collection_params): + """Create collection with proper configuration""" + url = f"{self.host}/collections/{QDRANT_COLLECTION_NAME}" + + # Build vectors configuration + if dataset.config.type == "sparse": + vectors_config = {} + sparse_vectors_config = { + "sparse": { + "index": { + "on_disk": False, + } + } + } + else: + is_vectors_on_disk = self.collection_params.get("vectors_config", {}).get( + "on_disk", False + ) + self.collection_params.pop("vectors_config", None) + + vectors_config = { + "size": dataset.config.vector_size, + "distance": self.DISTANCE_MAPPING.get(dataset.config.distance), + "on_disk": is_vectors_on_disk, + } + sparse_vectors_config = None + + # Extract payload index params + payload_index_params = self.collection_params.pop("payload_index_params", {}) + if not set(payload_index_params.keys()).issubset(dataset.config.schema.keys()): + raise ValueError("payload_index_params are not found in dataset schema") + + # Set optimizers config - disable index building during upload by default + optimizers_config = self.collection_params.setdefault("optimizers_config", {}) + optimizers_config.setdefault("max_optimization_threads", 0) + + # Build the collection creation payload + payload = {} + if vectors_config: + payload["vectors"] = vectors_config + if sparse_vectors_config: + payload["sparse_vectors"] = sparse_vectors_config + + # Add other collection params + for key, value in self.collection_params.items(): + payload[key] = value + + # Create the collection + response = self.client.put(url, json=payload) + response.raise_for_status() + + # Create payload indices for each field in the schema + for field_name, field_type in dataset.config.schema.items(): + self._create_payload_index(field_name, field_type, payload_index_params) + + def _create_payload_index(self, field_name: str, field_type: str, payload_index_params: dict): + """Create a payload index for a specific field""" + url = f"{self.host}/collections/{QDRANT_COLLECTION_NAME}/index" + + # Build the field schema based on type + if field_type in ["keyword", "uuid"]: + field_schema = { + "type": self.INDEX_TYPE_MAPPING.get(field_type, "keyword"), + } + + # Add optional parameters if provided + params = payload_index_params.get(field_name, {}) + if "is_tenant" in params and params["is_tenant"] is not None: + field_schema["is_tenant"] = params["is_tenant"] + if "on_disk" in params and params["on_disk"] is not None: + field_schema["on_disk"] = params["on_disk"] + else: + # For other types, just use the type string + field_schema = self.INDEX_TYPE_MAPPING.get(field_type, field_type) + + payload = { + "field_name": field_name, + "field_schema": field_schema, + } + + response = self.client.put(url, json=payload) + response.raise_for_status() + + def delete_client(self): + """Cleanup HTTP client""" + if hasattr(self, 'client') and self.client is not None: + self.client.close() \ No newline at end of file diff --git a/engine/clients/qdrant_native/parser.py b/engine/clients/qdrant_native/parser.py new file mode 100644 index 000000000..b9370e3b1 --- /dev/null +++ b/engine/clients/qdrant_native/parser.py @@ -0,0 +1,70 @@ +from typing import Any, List, Optional + +from engine.base_client.parser import BaseConditionParser, FieldValue + + +class QdrantNativeConditionParser(BaseConditionParser): + """ + Parser that converts internal filter format to Qdrant REST API JSON format. + Returns plain dictionaries instead of Pydantic models. + """ + + def build_condition( + self, and_subfilters: Optional[List[Any]], or_subfilters: Optional[List[Any]] + ) -> Optional[Any]: + """Build a filter condition combining AND/OR subfilters""" + filter_dict = {} + + if and_subfilters: + filter_dict["must"] = and_subfilters + + if or_subfilters: + filter_dict["should"] = or_subfilters + + return filter_dict if filter_dict else None + + def build_exact_match_filter(self, field_name: str, value: FieldValue) -> Any: + """Build an exact match filter""" + return { + "key": field_name, + "match": {"value": value}, + } + + def build_range_filter( + self, + field_name: str, + lt: Optional[FieldValue], + gt: Optional[FieldValue], + lte: Optional[FieldValue], + gte: Optional[FieldValue], + ) -> Any: + """Build a range filter""" + range_dict = {} + if lt is not None: + range_dict["lt"] = lt + if gt is not None: + range_dict["gt"] = gt + if lte is not None: + range_dict["lte"] = lte + if gte is not None: + range_dict["gte"] = gte + + return { + "key": field_name, + "range": range_dict, + } + + def build_geo_filter( + self, field_name: str, lat: float, lon: float, radius: float + ) -> Any: + """Build a geo radius filter""" + return { + "key": field_name, + "geo_radius": { + "center": { + "lon": lon, + "lat": lat, + }, + "radius": radius, + }, + } \ No newline at end of file diff --git a/engine/clients/qdrant_native/search.py b/engine/clients/qdrant_native/search.py new file mode 100644 index 000000000..d723a5859 --- /dev/null +++ b/engine/clients/qdrant_native/search.py @@ -0,0 +1,103 @@ +from typing import List, Tuple + +import httpx + +from dataset_reader.base_reader import Query +from engine.base_client.search import BaseSearcher +from engine.clients.qdrant_native.config import QDRANT_API_KEY, QDRANT_COLLECTION_NAME +from engine.clients.qdrant_native.parser import QdrantNativeConditionParser + + +class QdrantNativeSearcher(BaseSearcher): + search_params = {} + client: httpx.Client = None + parser = QdrantNativeConditionParser() + host = None + headers = {} + + @classmethod + def init_client(cls, host, distance, connection_params: dict, search_params: dict): + cls.host = host.rstrip('/') + cls.search_params = search_params + + # Build headers + cls.headers = {"Content-Type": "application/json"} + if QDRANT_API_KEY: + cls.headers["api-key"] = QDRANT_API_KEY + + # Create HTTP client + timeout = connection_params.get("timeout", 30) + cls.client = httpx.Client( + headers=cls.headers, + timeout=httpx.Timeout(timeout=timeout), + limits=httpx.Limits(max_connections=None, max_keepalive_connections=0), + ) + + @classmethod + def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: + """Execute a single search query using REST API""" + url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}/points/query" + + # Build the query vector + if query.sparse_vector is None: + # Dense vector query + query_vector = query.vector + else: + # Sparse vector query + query_vector = { + "indices": query.sparse_vector.indices, + "values": query.sparse_vector.values, + } + + # Build the request payload + payload = { + "query": query_vector, + "limit": top, + } + + # Add 'using' parameter for sparse vectors + if query.sparse_vector is not None: + payload["using"] = "sparse" + + # Add filter if present + query_filter = cls.parser.parse(query.meta_conditions) + if query_filter: + payload["filter"] = query_filter + + # Add search params configuration + search_config = cls.search_params.get("config", {}) + if search_config: + payload["params"] = search_config + + # Handle prefetch (for hybrid search) + prefetch_config = cls.search_params.get("prefetch") + if prefetch_config: + prefetch = { + **prefetch_config, + "query": query_vector, + } + payload["prefetch"] = [prefetch] + + # Add with_payload option + with_payload = cls.search_params.get("with_payload", False) + payload["with_payload"] = with_payload + + try: + response = cls.client.post(url, json=payload) + response.raise_for_status() + result = response.json() + + # Extract results from response + points = result["result"]["points"] + return [(point["id"], point["score"]) for point in points] + + except Exception as ex: + print(f"Something went wrong during search: {ex}") + raise ex + + @classmethod + def delete_client(cls): + """Cleanup HTTP client""" + if cls.client is not None: + cls.client.close() + cls.client = None \ No newline at end of file diff --git a/engine/clients/qdrant_native/upload.py b/engine/clients/qdrant_native/upload.py new file mode 100644 index 000000000..5adbd2bdd --- /dev/null +++ b/engine/clients/qdrant_native/upload.py @@ -0,0 +1,137 @@ +import time +from typing import List + +import httpx + +from dataset_reader.base_reader import Record +from engine.base_client.upload import BaseUploader +from engine.clients.qdrant_native.config import QDRANT_API_KEY, QDRANT_COLLECTION_NAME + + +class QdrantNativeUploader(BaseUploader): + client = None + upload_params = {} + host = None + headers = {} + + @classmethod + def init_client(cls, host, distance, connection_params, upload_params): + cls.host = host.rstrip('/') + cls.upload_params = upload_params + + # Build headers + cls.headers = {"Content-Type": "application/json"} + if QDRANT_API_KEY: + cls.headers["api-key"] = QDRANT_API_KEY + + # Create HTTP client with connection pooling + timeout = connection_params.get("timeout", 30) + cls.client = httpx.Client( + headers=cls.headers, + timeout=httpx.Timeout(timeout=timeout), + limits=httpx.Limits(max_connections=100, max_keepalive_connections=20), + ) + + @classmethod + def upload_batch(cls, batch: List[Record]): + """Upload a batch of records using REST API""" + # Build the batch payload + points = [] + for point in batch: + point_data = { + "id": point.id, + "payload": point.metadata or {}, + } + + # Handle vector (dense or sparse) + if point.sparse_vector is None: + # Dense vector + point_data["vector"] = point.vector + else: + # Sparse vector + point_data["vector"] = { + "sparse": { + "indices": point.sparse_vector.indices, + "values": point.sparse_vector.values, + } + } + + points.append(point_data) + + # Upsert the batch + url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}/points" + payload = { + "points": points, + } + + response = cls.client.put(url, json=payload, params={"wait": "false"}) + response.raise_for_status() + + @classmethod + def post_upload(cls, _distance): + """ + Post-upload operations: + 1. Enable index optimization if it was disabled + 2. Wait for collection to become GREEN + """ + # Get current collection info + url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}" + response = cls.client.get(url) + response.raise_for_status() + collection_info = response.json()["result"] + + # Check if optimization was disabled + max_optimization_threads = collection_info["config"]["optimizer_config"].get( + "max_optimization_threads", 1 + ) + + if max_optimization_threads == 0: + # Enable optimization + patch_url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}" + patch_payload = { + "optimizers_config": { + "max_optimization_threads": 100_000, + } + } + response = cls.client.patch(patch_url, json=patch_payload) + response.raise_for_status() + + # Wait for collection to become GREEN + cls.wait_collection_green() + return {} + + @classmethod + def wait_collection_green(cls): + """Wait for collection status to be GREEN""" + wait_time = 5.0 + total = 0 + url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}" + + while True: + time.sleep(wait_time) + total += wait_time + + response = cls.client.get(url) + response.raise_for_status() + collection_info = response.json()["result"] + + if collection_info["status"] != "green": + continue + + # Check twice to ensure stability + time.sleep(wait_time) + response = cls.client.get(url) + response.raise_for_status() + collection_info = response.json()["result"] + + if collection_info["status"] == "green": + break + + return total + + @classmethod + def delete_client(cls): + """Cleanup HTTP client""" + if cls.client is not None: + cls.client.close() + cls.client = None \ No newline at end of file diff --git a/experiments/configurations/qdrant-native-single-node.json b/experiments/configurations/qdrant-native-single-node.json new file mode 100644 index 000000000..15acaf8ee --- /dev/null +++ b/experiments/configurations/qdrant-native-single-node.json @@ -0,0 +1,217 @@ +[ + { + "name": "qdrant-native-default", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "optimizers_config": { "memmap_threshold": 10000000 } + }, + "search_params": [ + { "parallel": 8, "config": { "hnsw_ef": 128 } } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-native-continuous-benchmark", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { + "scalar": { + "type": "int8", + "quantile": 0.99 + } + }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { + "oversampling": 2.0, + "rescore": true + } + } + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-continuous-benchmark-with-payload", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { + "scalar": { + "type": "int8", + "quantile": 0.99 + } + }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { + "oversampling": 2.0 + } + }, + "with_payload": true + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-continuous-benchmark-indexed-only", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { + "scalar": { + "type": "int8", + "quantile": 0.99 + } + }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { + "oversampling": 2.0, + "rescore": true + }, + "indexed_only": true + } + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-continuous-benchmark-indexed-only-with-payload", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { + "scalar": { + "type": "int8", + "quantile": 0.99 + } + }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { + "oversampling": 2.0 + }, + "indexed_only": true + }, + "with_payload": true + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-bq-continuous-benchmark", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { "binary": {"always_ram": true} }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { "rescore": true, "oversampling": 2.0 } + } + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-bq-continuous-benchmark-with-payload", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { "binary": {"always_ram": true} }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { "rescore": true, "oversampling": 2.0 } + }, + "with_payload": true + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + } +] \ No newline at end of file From 7f7523d590b5bdaef2746c4d45afdcc0cfb4bc5f Mon Sep 17 00:00:00 2001 From: tellet-q Date: Fri, 31 Oct 2025 11:59:52 +0100 Subject: [PATCH 2/6] Fix host --- engine/clients/qdrant_native/configure.py | 2 +- engine/clients/qdrant_native/search.py | 2 +- engine/clients/qdrant_native/upload.py | 2 +- .../configurations/qdrant-native-single-node.json | 10 +++++----- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/engine/clients/qdrant_native/configure.py b/engine/clients/qdrant_native/configure.py index 20cf09baa..5a3bd40bc 100644 --- a/engine/clients/qdrant_native/configure.py +++ b/engine/clients/qdrant_native/configure.py @@ -24,7 +24,7 @@ class QdrantNativeConfigurator(BaseConfigurator): def __init__(self, host, collection_params: dict, connection_params: dict): super().__init__(host, collection_params, connection_params) - self.host = host.rstrip('/') + self.host = f"http://{host.rstrip('/')}:6333" self.connection_params = connection_params # Build headers diff --git a/engine/clients/qdrant_native/search.py b/engine/clients/qdrant_native/search.py index d723a5859..811e5d2da 100644 --- a/engine/clients/qdrant_native/search.py +++ b/engine/clients/qdrant_native/search.py @@ -17,7 +17,7 @@ class QdrantNativeSearcher(BaseSearcher): @classmethod def init_client(cls, host, distance, connection_params: dict, search_params: dict): - cls.host = host.rstrip('/') + cls.host = f"http://{host.rstrip('/')}:6333" cls.search_params = search_params # Build headers diff --git a/engine/clients/qdrant_native/upload.py b/engine/clients/qdrant_native/upload.py index 5adbd2bdd..ad948f7ef 100644 --- a/engine/clients/qdrant_native/upload.py +++ b/engine/clients/qdrant_native/upload.py @@ -16,7 +16,7 @@ class QdrantNativeUploader(BaseUploader): @classmethod def init_client(cls, host, distance, connection_params, upload_params): - cls.host = host.rstrip('/') + cls.host = f"http://{host.rstrip('/')}:6333" cls.upload_params = upload_params # Build headers diff --git a/experiments/configurations/qdrant-native-single-node.json b/experiments/configurations/qdrant-native-single-node.json index 15acaf8ee..fd3cbddc5 100644 --- a/experiments/configurations/qdrant-native-single-node.json +++ b/experiments/configurations/qdrant-native-single-node.json @@ -48,7 +48,7 @@ "upload_params": { "parallel": 16, "batch_size": 1024 } }, { - "name": "qdrant-continuous-benchmark-with-payload", + "name": "qdrant-native-continuous-benchmark-with-payload", "engine": "qdrant_native", "connection_params": { "timeout": 30 }, "collection_params": { @@ -84,7 +84,7 @@ "upload_params": { "parallel": 16, "batch_size": 1024 } }, { - "name": "qdrant-continuous-benchmark-indexed-only", + "name": "qdrant-native-continuous-benchmark-indexed-only", "engine": "qdrant_native", "connection_params": { "timeout": 30 }, "collection_params": { @@ -121,7 +121,7 @@ "upload_params": { "parallel": 16, "batch_size": 1024 } }, { - "name": "qdrant-continuous-benchmark-indexed-only-with-payload", + "name": "qdrant-native-continuous-benchmark-indexed-only-with-payload", "engine": "qdrant_native", "connection_params": { "timeout": 30 }, "collection_params": { @@ -158,7 +158,7 @@ "upload_params": { "parallel": 16, "batch_size": 1024 } }, { - "name": "qdrant-bq-continuous-benchmark", + "name": "qdrant-native-bq-continuous-benchmark", "engine": "qdrant_native", "connection_params": { "timeout": 30 }, "collection_params": { @@ -186,7 +186,7 @@ "upload_params": { "parallel": 16, "batch_size": 1024 } }, { - "name": "qdrant-bq-continuous-benchmark-with-payload", + "name": "qdrant-native-bq-continuous-benchmark-with-payload", "engine": "qdrant_native", "connection_params": { "timeout": 30 }, "collection_params": { From f4d1c8fb180fcf5e018381c200b5b70bf9345119 Mon Sep 17 00:00:00 2001 From: tellet-q Date: Fri, 31 Oct 2025 12:20:19 +0100 Subject: [PATCH 3/6] Reduce comments --- engine/clients/qdrant_native/configure.py | 6 ------ engine/clients/qdrant_native/search.py | 10 ---------- engine/clients/qdrant_native/upload.py | 6 ------ 3 files changed, 22 deletions(-) diff --git a/engine/clients/qdrant_native/configure.py b/engine/clients/qdrant_native/configure.py index 5a3bd40bc..47ce86ca7 100644 --- a/engine/clients/qdrant_native/configure.py +++ b/engine/clients/qdrant_native/configure.py @@ -27,12 +27,10 @@ def __init__(self, host, collection_params: dict, connection_params: dict): self.host = f"http://{host.rstrip('/')}:6333" self.connection_params = connection_params - # Build headers self.headers = {"Content-Type": "application/json"} if QDRANT_API_KEY: self.headers["api-key"] = QDRANT_API_KEY - # Create HTTP client timeout = connection_params.get("timeout", 30) self.client = httpx.Client( headers=self.headers, @@ -74,7 +72,6 @@ def recreate(self, dataset: Dataset, collection_params): } sparse_vectors_config = None - # Extract payload index params payload_index_params = self.collection_params.pop("payload_index_params", {}) if not set(payload_index_params.keys()).issubset(dataset.config.schema.keys()): raise ValueError("payload_index_params are not found in dataset schema") @@ -90,15 +87,12 @@ def recreate(self, dataset: Dataset, collection_params): if sparse_vectors_config: payload["sparse_vectors"] = sparse_vectors_config - # Add other collection params for key, value in self.collection_params.items(): payload[key] = value - # Create the collection response = self.client.put(url, json=payload) response.raise_for_status() - # Create payload indices for each field in the schema for field_name, field_type in dataset.config.schema.items(): self._create_payload_index(field_name, field_type, payload_index_params) diff --git a/engine/clients/qdrant_native/search.py b/engine/clients/qdrant_native/search.py index 811e5d2da..2cb610b59 100644 --- a/engine/clients/qdrant_native/search.py +++ b/engine/clients/qdrant_native/search.py @@ -38,38 +38,30 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: """Execute a single search query using REST API""" url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}/points/query" - # Build the query vector if query.sparse_vector is None: - # Dense vector query query_vector = query.vector else: - # Sparse vector query query_vector = { "indices": query.sparse_vector.indices, "values": query.sparse_vector.values, } - # Build the request payload payload = { "query": query_vector, "limit": top, } - # Add 'using' parameter for sparse vectors if query.sparse_vector is not None: payload["using"] = "sparse" - # Add filter if present query_filter = cls.parser.parse(query.meta_conditions) if query_filter: payload["filter"] = query_filter - # Add search params configuration search_config = cls.search_params.get("config", {}) if search_config: payload["params"] = search_config - # Handle prefetch (for hybrid search) prefetch_config = cls.search_params.get("prefetch") if prefetch_config: prefetch = { @@ -78,7 +70,6 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: } payload["prefetch"] = [prefetch] - # Add with_payload option with_payload = cls.search_params.get("with_payload", False) payload["with_payload"] = with_payload @@ -87,7 +78,6 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: response.raise_for_status() result = response.json() - # Extract results from response points = result["result"]["points"] return [(point["id"], point["score"]) for point in points] diff --git a/engine/clients/qdrant_native/upload.py b/engine/clients/qdrant_native/upload.py index ad948f7ef..acdadd087 100644 --- a/engine/clients/qdrant_native/upload.py +++ b/engine/clients/qdrant_native/upload.py @@ -35,7 +35,6 @@ def init_client(cls, host, distance, connection_params, upload_params): @classmethod def upload_batch(cls, batch: List[Record]): """Upload a batch of records using REST API""" - # Build the batch payload points = [] for point in batch: point_data = { @@ -45,10 +44,8 @@ def upload_batch(cls, batch: List[Record]): # Handle vector (dense or sparse) if point.sparse_vector is None: - # Dense vector point_data["vector"] = point.vector else: - # Sparse vector point_data["vector"] = { "sparse": { "indices": point.sparse_vector.indices, @@ -58,7 +55,6 @@ def upload_batch(cls, batch: List[Record]): points.append(point_data) - # Upsert the batch url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}/points" payload = { "points": points, @@ -74,7 +70,6 @@ def post_upload(cls, _distance): 1. Enable index optimization if it was disabled 2. Wait for collection to become GREEN """ - # Get current collection info url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}" response = cls.client.get(url) response.raise_for_status() @@ -96,7 +91,6 @@ def post_upload(cls, _distance): response = cls.client.patch(patch_url, json=patch_payload) response.raise_for_status() - # Wait for collection to become GREEN cls.wait_collection_green() return {} From 1008085ac8104d15d47e0791f511759a4f05d6f2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 31 Oct 2025 11:26:40 +0000 Subject: [PATCH 4/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- engine/clients/qdrant_native/__init__.py | 2 +- engine/clients/qdrant_native/config.py | 2 +- engine/clients/qdrant_native/configure.py | 8 +++++--- engine/clients/qdrant_native/parser.py | 2 +- engine/clients/qdrant_native/search.py | 2 +- engine/clients/qdrant_native/upload.py | 2 +- 6 files changed, 10 insertions(+), 8 deletions(-) diff --git a/engine/clients/qdrant_native/__init__.py b/engine/clients/qdrant_native/__init__.py index 1a5c1ddea..bcdaff6c6 100644 --- a/engine/clients/qdrant_native/__init__.py +++ b/engine/clients/qdrant_native/__init__.py @@ -6,4 +6,4 @@ "QdrantNativeConfigurator", "QdrantNativeUploader", "QdrantNativeSearcher", -] \ No newline at end of file +] diff --git a/engine/clients/qdrant_native/config.py b/engine/clients/qdrant_native/config.py index 8f9ea6991..164613d62 100644 --- a/engine/clients/qdrant_native/config.py +++ b/engine/clients/qdrant_native/config.py @@ -1,4 +1,4 @@ import os QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME", "benchmark") -QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", None) \ No newline at end of file +QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", None) diff --git a/engine/clients/qdrant_native/configure.py b/engine/clients/qdrant_native/configure.py index 47ce86ca7..58a7d0946 100644 --- a/engine/clients/qdrant_native/configure.py +++ b/engine/clients/qdrant_native/configure.py @@ -96,7 +96,9 @@ def recreate(self, dataset: Dataset, collection_params): for field_name, field_type in dataset.config.schema.items(): self._create_payload_index(field_name, field_type, payload_index_params) - def _create_payload_index(self, field_name: str, field_type: str, payload_index_params: dict): + def _create_payload_index( + self, field_name: str, field_type: str, payload_index_params: dict + ): """Create a payload index for a specific field""" url = f"{self.host}/collections/{QDRANT_COLLECTION_NAME}/index" @@ -126,5 +128,5 @@ def _create_payload_index(self, field_name: str, field_type: str, payload_index_ def delete_client(self): """Cleanup HTTP client""" - if hasattr(self, 'client') and self.client is not None: - self.client.close() \ No newline at end of file + if hasattr(self, "client") and self.client is not None: + self.client.close() diff --git a/engine/clients/qdrant_native/parser.py b/engine/clients/qdrant_native/parser.py index b9370e3b1..1b6721d7a 100644 --- a/engine/clients/qdrant_native/parser.py +++ b/engine/clients/qdrant_native/parser.py @@ -67,4 +67,4 @@ def build_geo_filter( }, "radius": radius, }, - } \ No newline at end of file + } diff --git a/engine/clients/qdrant_native/search.py b/engine/clients/qdrant_native/search.py index 2cb610b59..729401810 100644 --- a/engine/clients/qdrant_native/search.py +++ b/engine/clients/qdrant_native/search.py @@ -90,4 +90,4 @@ def delete_client(cls): """Cleanup HTTP client""" if cls.client is not None: cls.client.close() - cls.client = None \ No newline at end of file + cls.client = None diff --git a/engine/clients/qdrant_native/upload.py b/engine/clients/qdrant_native/upload.py index acdadd087..519fe24a4 100644 --- a/engine/clients/qdrant_native/upload.py +++ b/engine/clients/qdrant_native/upload.py @@ -128,4 +128,4 @@ def delete_client(cls): """Cleanup HTTP client""" if cls.client is not None: cls.client.close() - cls.client = None \ No newline at end of file + cls.client = None From e4e1f1fe16f8c1063b26c3cdf97ded2afe6817c5 Mon Sep 17 00:00:00 2001 From: tellet-q Date: Thu, 13 Nov 2025 12:01:02 +0100 Subject: [PATCH 5/6] Fix bigger datasets --- engine/base_client/upload.py | 12 ++++++++---- engine/clients/qdrant_native/search.py | 15 +++++++++++---- engine/clients/qdrant_native/upload.py | 24 ++++++++++++++++++++---- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/engine/base_client/upload.py b/engine/base_client/upload.py index 55ee4055d..29c126907 100644 --- a/engine/base_client/upload.py +++ b/engine/base_client/upload.py @@ -34,11 +34,11 @@ def upload( parallel = self.upload_params.get("parallel", 1) batch_size = self.upload_params.get("batch_size", 64) - self.init_client( - self.host, distance, self.connection_params, self.upload_params - ) - if parallel == 1: + # Initialize client in parent process for serial uploads + self.init_client( + self.host, distance, self.connection_params, self.upload_params + ) for batch in iter_batches(tqdm.tqdm(records), batch_size): latencies.append(self._upload_batch(batch)) else: @@ -59,6 +59,10 @@ def upload( iter_batches(tqdm.tqdm(records), batch_size), ) ) + # Initialize client in parent process for post-upload operations + self.init_client( + self.host, distance, self.connection_params, self.upload_params + ) upload_time = time.perf_counter() - start diff --git a/engine/clients/qdrant_native/search.py b/engine/clients/qdrant_native/search.py index 729401810..c738dcb50 100644 --- a/engine/clients/qdrant_native/search.py +++ b/engine/clients/qdrant_native/search.py @@ -26,10 +26,16 @@ def init_client(cls, host, distance, connection_params: dict, search_params: dic cls.headers["api-key"] = QDRANT_API_KEY # Create HTTP client - timeout = connection_params.get("timeout", 30) + # Use longer timeout for write operations to handle large query payloads + base_timeout = connection_params.get("timeout", 30) cls.client = httpx.Client( headers=cls.headers, - timeout=httpx.Timeout(timeout=timeout), + timeout=httpx.Timeout( + connect=base_timeout, + read=base_timeout, + write=base_timeout * 5, # 5x longer for writes + pool=base_timeout, + ), limits=httpx.Limits(max_connections=None, max_keepalive_connections=0), ) @@ -41,9 +47,10 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: if query.sparse_vector is None: query_vector = query.vector else: + # Convert numpy types to native Python types for JSON serialization query_vector = { - "indices": query.sparse_vector.indices, - "values": query.sparse_vector.values, + "indices": [int(i) for i in query.sparse_vector.indices], + "values": [float(v) for v in query.sparse_vector.values], } payload = { diff --git a/engine/clients/qdrant_native/upload.py b/engine/clients/qdrant_native/upload.py index 519fe24a4..f89a24652 100644 --- a/engine/clients/qdrant_native/upload.py +++ b/engine/clients/qdrant_native/upload.py @@ -25,16 +25,32 @@ def init_client(cls, host, distance, connection_params, upload_params): cls.headers["api-key"] = QDRANT_API_KEY # Create HTTP client with connection pooling - timeout = connection_params.get("timeout", 30) + # Use longer timeout for write operations to handle large payloads + base_timeout = connection_params.get("timeout", 30) cls.client = httpx.Client( headers=cls.headers, - timeout=httpx.Timeout(timeout=timeout), + timeout=httpx.Timeout( + connect=base_timeout, + read=base_timeout, + write=base_timeout * 10, # 10x longer for writes + pool=base_timeout, + ), limits=httpx.Limits(max_connections=100, max_keepalive_connections=20), ) @classmethod def upload_batch(cls, batch: List[Record]): """Upload a batch of records using REST API""" + # Qdrant has a 32MB JSON payload limit + # For large batches with dense high-dim vectors, split into smaller sub-batches + MAX_BATCH_SIZE = 512 + + if len(batch) > MAX_BATCH_SIZE: + # Split into smaller sub-batches + for i in range(0, len(batch), MAX_BATCH_SIZE): + cls.upload_batch(batch[i:i + MAX_BATCH_SIZE]) + return + points = [] for point in batch: point_data = { @@ -48,8 +64,8 @@ def upload_batch(cls, batch: List[Record]): else: point_data["vector"] = { "sparse": { - "indices": point.sparse_vector.indices, - "values": point.sparse_vector.values, + "indices": [int(i) for i in point.sparse_vector.indices], + "values": [float(v) for v in point.sparse_vector.values], } } From a18a54b0eaf7f3cbe229842add4a938af240fc76 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 13 Nov 2025 11:01:30 +0000 Subject: [PATCH 6/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- engine/clients/qdrant_native/upload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/clients/qdrant_native/upload.py b/engine/clients/qdrant_native/upload.py index f89a24652..6cf6c0429 100644 --- a/engine/clients/qdrant_native/upload.py +++ b/engine/clients/qdrant_native/upload.py @@ -48,7 +48,7 @@ def upload_batch(cls, batch: List[Record]): if len(batch) > MAX_BATCH_SIZE: # Split into smaller sub-batches for i in range(0, len(batch), MAX_BATCH_SIZE): - cls.upload_batch(batch[i:i + MAX_BATCH_SIZE]) + cls.upload_batch(batch[i : i + MAX_BATCH_SIZE]) return points = []