diff --git a/engine/base_client/upload.py b/engine/base_client/upload.py index 55ee4055d..29c126907 100644 --- a/engine/base_client/upload.py +++ b/engine/base_client/upload.py @@ -34,11 +34,11 @@ def upload( parallel = self.upload_params.get("parallel", 1) batch_size = self.upload_params.get("batch_size", 64) - self.init_client( - self.host, distance, self.connection_params, self.upload_params - ) - if parallel == 1: + # Initialize client in parent process for serial uploads + self.init_client( + self.host, distance, self.connection_params, self.upload_params + ) for batch in iter_batches(tqdm.tqdm(records), batch_size): latencies.append(self._upload_batch(batch)) else: @@ -59,6 +59,10 @@ def upload( iter_batches(tqdm.tqdm(records), batch_size), ) ) + # Initialize client in parent process for post-upload operations + self.init_client( + self.host, distance, self.connection_params, self.upload_params + ) upload_time = time.perf_counter() - start diff --git a/engine/clients/client_factory.py b/engine/clients/client_factory.py index a74df2ab4..5905f2455 100644 --- a/engine/clients/client_factory.py +++ b/engine/clients/client_factory.py @@ -24,6 +24,11 @@ PgVectorUploader, ) from engine.clients.qdrant import QdrantConfigurator, QdrantSearcher, QdrantUploader +from engine.clients.qdrant_native import ( + QdrantNativeConfigurator, + QdrantNativeSearcher, + QdrantNativeUploader, +) from engine.clients.redis import RedisConfigurator, RedisSearcher, RedisUploader from engine.clients.weaviate import ( WeaviateConfigurator, @@ -33,6 +38,7 @@ ENGINE_CONFIGURATORS = { "qdrant": QdrantConfigurator, + "qdrant_native": QdrantNativeConfigurator, "weaviate": WeaviateConfigurator, "milvus": MilvusConfigurator, "elasticsearch": ElasticConfigurator, @@ -43,6 +49,7 @@ ENGINE_UPLOADERS = { "qdrant": QdrantUploader, + "qdrant_native": QdrantNativeUploader, "weaviate": WeaviateUploader, "milvus": MilvusUploader, "elasticsearch": ElasticUploader, @@ -53,6 +60,7 @@ ENGINE_SEARCHERS = { "qdrant": QdrantSearcher, + "qdrant_native": QdrantNativeSearcher, "weaviate": WeaviateSearcher, "milvus": MilvusSearcher, "elasticsearch": ElasticSearcher, diff --git a/engine/clients/qdrant_native/__init__.py b/engine/clients/qdrant_native/__init__.py new file mode 100644 index 000000000..bcdaff6c6 --- /dev/null +++ b/engine/clients/qdrant_native/__init__.py @@ -0,0 +1,9 @@ +from .configure import QdrantNativeConfigurator +from .search import QdrantNativeSearcher +from .upload import QdrantNativeUploader + +__all__ = [ + "QdrantNativeConfigurator", + "QdrantNativeUploader", + "QdrantNativeSearcher", +] diff --git a/engine/clients/qdrant_native/config.py b/engine/clients/qdrant_native/config.py new file mode 100644 index 000000000..164613d62 --- /dev/null +++ b/engine/clients/qdrant_native/config.py @@ -0,0 +1,4 @@ +import os + +QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME", "benchmark") +QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", None) diff --git a/engine/clients/qdrant_native/configure.py b/engine/clients/qdrant_native/configure.py new file mode 100644 index 000000000..58a7d0946 --- /dev/null +++ b/engine/clients/qdrant_native/configure.py @@ -0,0 +1,132 @@ +import httpx + +from benchmark.dataset import Dataset +from engine.base_client.configure import BaseConfigurator +from engine.base_client.distances import Distance +from engine.clients.qdrant_native.config import QDRANT_API_KEY, QDRANT_COLLECTION_NAME + + +class QdrantNativeConfigurator(BaseConfigurator): + SPARSE_VECTOR_SUPPORT = True + DISTANCE_MAPPING = { + Distance.L2: "Euclid", + Distance.COSINE: "Cosine", + Distance.DOT: "Dot", + } + INDEX_TYPE_MAPPING = { + "int": "integer", + "keyword": "keyword", + "text": "text", + "float": "float", + "geo": "geo", + } + + def __init__(self, host, collection_params: dict, connection_params: dict): + super().__init__(host, collection_params, connection_params) + + self.host = f"http://{host.rstrip('/')}:6333" + self.connection_params = connection_params + + self.headers = {"Content-Type": "application/json"} + if QDRANT_API_KEY: + self.headers["api-key"] = QDRANT_API_KEY + + timeout = connection_params.get("timeout", 30) + self.client = httpx.Client( + headers=self.headers, + timeout=httpx.Timeout(timeout=timeout), + ) + + def clean(self): + """Delete the collection""" + url = f"{self.host}/collections/{QDRANT_COLLECTION_NAME}" + response = self.client.delete(url) + # 404 is ok if collection doesn't exist + if response.status_code not in [200, 404]: + response.raise_for_status() + + def recreate(self, dataset: Dataset, collection_params): + """Create collection with proper configuration""" + url = f"{self.host}/collections/{QDRANT_COLLECTION_NAME}" + + # Build vectors configuration + if dataset.config.type == "sparse": + vectors_config = {} + sparse_vectors_config = { + "sparse": { + "index": { + "on_disk": False, + } + } + } + else: + is_vectors_on_disk = self.collection_params.get("vectors_config", {}).get( + "on_disk", False + ) + self.collection_params.pop("vectors_config", None) + + vectors_config = { + "size": dataset.config.vector_size, + "distance": self.DISTANCE_MAPPING.get(dataset.config.distance), + "on_disk": is_vectors_on_disk, + } + sparse_vectors_config = None + + payload_index_params = self.collection_params.pop("payload_index_params", {}) + if not set(payload_index_params.keys()).issubset(dataset.config.schema.keys()): + raise ValueError("payload_index_params are not found in dataset schema") + + # Set optimizers config - disable index building during upload by default + optimizers_config = self.collection_params.setdefault("optimizers_config", {}) + optimizers_config.setdefault("max_optimization_threads", 0) + + # Build the collection creation payload + payload = {} + if vectors_config: + payload["vectors"] = vectors_config + if sparse_vectors_config: + payload["sparse_vectors"] = sparse_vectors_config + + for key, value in self.collection_params.items(): + payload[key] = value + + response = self.client.put(url, json=payload) + response.raise_for_status() + + for field_name, field_type in dataset.config.schema.items(): + self._create_payload_index(field_name, field_type, payload_index_params) + + def _create_payload_index( + self, field_name: str, field_type: str, payload_index_params: dict + ): + """Create a payload index for a specific field""" + url = f"{self.host}/collections/{QDRANT_COLLECTION_NAME}/index" + + # Build the field schema based on type + if field_type in ["keyword", "uuid"]: + field_schema = { + "type": self.INDEX_TYPE_MAPPING.get(field_type, "keyword"), + } + + # Add optional parameters if provided + params = payload_index_params.get(field_name, {}) + if "is_tenant" in params and params["is_tenant"] is not None: + field_schema["is_tenant"] = params["is_tenant"] + if "on_disk" in params and params["on_disk"] is not None: + field_schema["on_disk"] = params["on_disk"] + else: + # For other types, just use the type string + field_schema = self.INDEX_TYPE_MAPPING.get(field_type, field_type) + + payload = { + "field_name": field_name, + "field_schema": field_schema, + } + + response = self.client.put(url, json=payload) + response.raise_for_status() + + def delete_client(self): + """Cleanup HTTP client""" + if hasattr(self, "client") and self.client is not None: + self.client.close() diff --git a/engine/clients/qdrant_native/parser.py b/engine/clients/qdrant_native/parser.py new file mode 100644 index 000000000..1b6721d7a --- /dev/null +++ b/engine/clients/qdrant_native/parser.py @@ -0,0 +1,70 @@ +from typing import Any, List, Optional + +from engine.base_client.parser import BaseConditionParser, FieldValue + + +class QdrantNativeConditionParser(BaseConditionParser): + """ + Parser that converts internal filter format to Qdrant REST API JSON format. + Returns plain dictionaries instead of Pydantic models. + """ + + def build_condition( + self, and_subfilters: Optional[List[Any]], or_subfilters: Optional[List[Any]] + ) -> Optional[Any]: + """Build a filter condition combining AND/OR subfilters""" + filter_dict = {} + + if and_subfilters: + filter_dict["must"] = and_subfilters + + if or_subfilters: + filter_dict["should"] = or_subfilters + + return filter_dict if filter_dict else None + + def build_exact_match_filter(self, field_name: str, value: FieldValue) -> Any: + """Build an exact match filter""" + return { + "key": field_name, + "match": {"value": value}, + } + + def build_range_filter( + self, + field_name: str, + lt: Optional[FieldValue], + gt: Optional[FieldValue], + lte: Optional[FieldValue], + gte: Optional[FieldValue], + ) -> Any: + """Build a range filter""" + range_dict = {} + if lt is not None: + range_dict["lt"] = lt + if gt is not None: + range_dict["gt"] = gt + if lte is not None: + range_dict["lte"] = lte + if gte is not None: + range_dict["gte"] = gte + + return { + "key": field_name, + "range": range_dict, + } + + def build_geo_filter( + self, field_name: str, lat: float, lon: float, radius: float + ) -> Any: + """Build a geo radius filter""" + return { + "key": field_name, + "geo_radius": { + "center": { + "lon": lon, + "lat": lat, + }, + "radius": radius, + }, + } diff --git a/engine/clients/qdrant_native/search.py b/engine/clients/qdrant_native/search.py new file mode 100644 index 000000000..c738dcb50 --- /dev/null +++ b/engine/clients/qdrant_native/search.py @@ -0,0 +1,100 @@ +from typing import List, Tuple + +import httpx + +from dataset_reader.base_reader import Query +from engine.base_client.search import BaseSearcher +from engine.clients.qdrant_native.config import QDRANT_API_KEY, QDRANT_COLLECTION_NAME +from engine.clients.qdrant_native.parser import QdrantNativeConditionParser + + +class QdrantNativeSearcher(BaseSearcher): + search_params = {} + client: httpx.Client = None + parser = QdrantNativeConditionParser() + host = None + headers = {} + + @classmethod + def init_client(cls, host, distance, connection_params: dict, search_params: dict): + cls.host = f"http://{host.rstrip('/')}:6333" + cls.search_params = search_params + + # Build headers + cls.headers = {"Content-Type": "application/json"} + if QDRANT_API_KEY: + cls.headers["api-key"] = QDRANT_API_KEY + + # Create HTTP client + # Use longer timeout for write operations to handle large query payloads + base_timeout = connection_params.get("timeout", 30) + cls.client = httpx.Client( + headers=cls.headers, + timeout=httpx.Timeout( + connect=base_timeout, + read=base_timeout, + write=base_timeout * 5, # 5x longer for writes + pool=base_timeout, + ), + limits=httpx.Limits(max_connections=None, max_keepalive_connections=0), + ) + + @classmethod + def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: + """Execute a single search query using REST API""" + url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}/points/query" + + if query.sparse_vector is None: + query_vector = query.vector + else: + # Convert numpy types to native Python types for JSON serialization + query_vector = { + "indices": [int(i) for i in query.sparse_vector.indices], + "values": [float(v) for v in query.sparse_vector.values], + } + + payload = { + "query": query_vector, + "limit": top, + } + + if query.sparse_vector is not None: + payload["using"] = "sparse" + + query_filter = cls.parser.parse(query.meta_conditions) + if query_filter: + payload["filter"] = query_filter + + search_config = cls.search_params.get("config", {}) + if search_config: + payload["params"] = search_config + + prefetch_config = cls.search_params.get("prefetch") + if prefetch_config: + prefetch = { + **prefetch_config, + "query": query_vector, + } + payload["prefetch"] = [prefetch] + + with_payload = cls.search_params.get("with_payload", False) + payload["with_payload"] = with_payload + + try: + response = cls.client.post(url, json=payload) + response.raise_for_status() + result = response.json() + + points = result["result"]["points"] + return [(point["id"], point["score"]) for point in points] + + except Exception as ex: + print(f"Something went wrong during search: {ex}") + raise ex + + @classmethod + def delete_client(cls): + """Cleanup HTTP client""" + if cls.client is not None: + cls.client.close() + cls.client = None diff --git a/engine/clients/qdrant_native/upload.py b/engine/clients/qdrant_native/upload.py new file mode 100644 index 000000000..6cf6c0429 --- /dev/null +++ b/engine/clients/qdrant_native/upload.py @@ -0,0 +1,147 @@ +import time +from typing import List + +import httpx + +from dataset_reader.base_reader import Record +from engine.base_client.upload import BaseUploader +from engine.clients.qdrant_native.config import QDRANT_API_KEY, QDRANT_COLLECTION_NAME + + +class QdrantNativeUploader(BaseUploader): + client = None + upload_params = {} + host = None + headers = {} + + @classmethod + def init_client(cls, host, distance, connection_params, upload_params): + cls.host = f"http://{host.rstrip('/')}:6333" + cls.upload_params = upload_params + + # Build headers + cls.headers = {"Content-Type": "application/json"} + if QDRANT_API_KEY: + cls.headers["api-key"] = QDRANT_API_KEY + + # Create HTTP client with connection pooling + # Use longer timeout for write operations to handle large payloads + base_timeout = connection_params.get("timeout", 30) + cls.client = httpx.Client( + headers=cls.headers, + timeout=httpx.Timeout( + connect=base_timeout, + read=base_timeout, + write=base_timeout * 10, # 10x longer for writes + pool=base_timeout, + ), + limits=httpx.Limits(max_connections=100, max_keepalive_connections=20), + ) + + @classmethod + def upload_batch(cls, batch: List[Record]): + """Upload a batch of records using REST API""" + # Qdrant has a 32MB JSON payload limit + # For large batches with dense high-dim vectors, split into smaller sub-batches + MAX_BATCH_SIZE = 512 + + if len(batch) > MAX_BATCH_SIZE: + # Split into smaller sub-batches + for i in range(0, len(batch), MAX_BATCH_SIZE): + cls.upload_batch(batch[i : i + MAX_BATCH_SIZE]) + return + + points = [] + for point in batch: + point_data = { + "id": point.id, + "payload": point.metadata or {}, + } + + # Handle vector (dense or sparse) + if point.sparse_vector is None: + point_data["vector"] = point.vector + else: + point_data["vector"] = { + "sparse": { + "indices": [int(i) for i in point.sparse_vector.indices], + "values": [float(v) for v in point.sparse_vector.values], + } + } + + points.append(point_data) + + url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}/points" + payload = { + "points": points, + } + + response = cls.client.put(url, json=payload, params={"wait": "false"}) + response.raise_for_status() + + @classmethod + def post_upload(cls, _distance): + """ + Post-upload operations: + 1. Enable index optimization if it was disabled + 2. Wait for collection to become GREEN + """ + url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}" + response = cls.client.get(url) + response.raise_for_status() + collection_info = response.json()["result"] + + # Check if optimization was disabled + max_optimization_threads = collection_info["config"]["optimizer_config"].get( + "max_optimization_threads", 1 + ) + + if max_optimization_threads == 0: + # Enable optimization + patch_url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}" + patch_payload = { + "optimizers_config": { + "max_optimization_threads": 100_000, + } + } + response = cls.client.patch(patch_url, json=patch_payload) + response.raise_for_status() + + cls.wait_collection_green() + return {} + + @classmethod + def wait_collection_green(cls): + """Wait for collection status to be GREEN""" + wait_time = 5.0 + total = 0 + url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}" + + while True: + time.sleep(wait_time) + total += wait_time + + response = cls.client.get(url) + response.raise_for_status() + collection_info = response.json()["result"] + + if collection_info["status"] != "green": + continue + + # Check twice to ensure stability + time.sleep(wait_time) + response = cls.client.get(url) + response.raise_for_status() + collection_info = response.json()["result"] + + if collection_info["status"] == "green": + break + + return total + + @classmethod + def delete_client(cls): + """Cleanup HTTP client""" + if cls.client is not None: + cls.client.close() + cls.client = None diff --git a/experiments/configurations/qdrant-native-single-node.json b/experiments/configurations/qdrant-native-single-node.json new file mode 100644 index 000000000..fd3cbddc5 --- /dev/null +++ b/experiments/configurations/qdrant-native-single-node.json @@ -0,0 +1,217 @@ +[ + { + "name": "qdrant-native-default", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "optimizers_config": { "memmap_threshold": 10000000 } + }, + "search_params": [ + { "parallel": 8, "config": { "hnsw_ef": 128 } } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-native-continuous-benchmark", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { + "scalar": { + "type": "int8", + "quantile": 0.99 + } + }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { + "oversampling": 2.0, + "rescore": true + } + } + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-native-continuous-benchmark-with-payload", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { + "scalar": { + "type": "int8", + "quantile": 0.99 + } + }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { + "oversampling": 2.0 + } + }, + "with_payload": true + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-native-continuous-benchmark-indexed-only", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { + "scalar": { + "type": "int8", + "quantile": 0.99 + } + }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { + "oversampling": 2.0, + "rescore": true + }, + "indexed_only": true + } + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-native-continuous-benchmark-indexed-only-with-payload", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { + "scalar": { + "type": "int8", + "quantile": 0.99 + } + }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { + "oversampling": 2.0 + }, + "indexed_only": true + }, + "with_payload": true + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-native-bq-continuous-benchmark", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { "binary": {"always_ram": true} }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { "rescore": true, "oversampling": 2.0 } + } + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + }, + { + "name": "qdrant-native-bq-continuous-benchmark-with-payload", + "engine": "qdrant_native", + "connection_params": { "timeout": 30 }, + "collection_params": { + "hnsw_config": { + "m": 32, + "ef_construct": 256 + }, + "quantization_config": { "binary": {"always_ram": true} }, + "optimizers_config": { + "max_segment_size": 1000000, + "default_segment_number": 3, + "memmap_threshold": 10000000, + "max_optimization_threads": null + } + }, + "search_params": [ + { + "parallel": 8, + "config": { + "hnsw_ef": 256, + "quantization": { "rescore": true, "oversampling": 2.0 } + }, + "with_payload": true + } + ], + "upload_params": { "parallel": 16, "batch_size": 1024 } + } +] \ No newline at end of file