diff --git a/src/raglite/_insert.py b/src/raglite/_insert.py index bd63c29e..91a4c4fd 100644 --- a/src/raglite/_insert.py +++ b/src/raglite/_insert.py @@ -1,5 +1,6 @@ """Index documents.""" +import os from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import nullcontext from functools import partial @@ -89,7 +90,11 @@ def _create_chunk_records( return document, chunk_records, chunk_embedding_records_list -def insert_documents( # noqa: C901 +# Cap the number of worker threads to avoid excessive resource usage. +MAX_DEFAULT_WORKERS = 4 # Prevents oversubscription and high memory usage. + + +def insert_documents( # noqa: C901, PLR0912 documents: list[Document], *, max_workers: int | None = None, @@ -130,6 +135,11 @@ def insert_documents( # noqa: C901 documents = [doc for doc in documents if doc.id not in existing_doc_ids] if not documents: return + + # Heuristic based on cpu count, amount of documents, and a cap of 4. + if max_workers is None: + max_workers = min(os.cpu_count() or 1, len(documents), MAX_DEFAULT_WORKERS) + # For DuckDB databases, acquire a lock on the database. if engine.dialect.name == "duckdb": db_url = make_url(config.db_url) if isinstance(config.db_url, str) else config.db_url