Skip to content

Commit c4217c1

Browse files
feat: add delete by filter and update by filer to OpenSearchDocumentStore (#2407)
* add delete by filter and update by filer to OpenSearchDocumentStore * fix temp bug * metadata field exist * fix metadata --------- Co-authored-by: David S. Batista <[email protected]>
1 parent 07353b6 commit c4217c1

File tree

3 files changed

+234
-0
lines changed

3 files changed

+234
-0
lines changed

integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,132 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None
608608
msg = f"Failed to delete all documents from OpenSearch: {e!s}"
609609
raise DocumentStoreError(msg) from e
610610

611+
def delete_by_filter(self, filters: Dict[str, Any]) -> int:
612+
"""
613+
Deletes all documents that match the provided filters.
614+
615+
:param filters: The filters to apply to select documents for deletion.
616+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
617+
:returns: The number of documents deleted.
618+
"""
619+
self._ensure_initialized()
620+
assert self._client is not None
621+
622+
try:
623+
normalized_filters = normalize_filters(filters)
624+
body = {"query": {"bool": {"filter": normalized_filters}}}
625+
result = self._client.delete_by_query(index=self._index, body=body)
626+
deleted_count = result.get("deleted", 0)
627+
logger.info(
628+
"Deleted {n_docs} documents from index '{index}' using filters.",
629+
n_docs=deleted_count,
630+
index=self._index,
631+
)
632+
return deleted_count
633+
except Exception as e:
634+
msg = f"Failed to delete documents by filter from OpenSearch: {e!s}"
635+
raise DocumentStoreError(msg) from e
636+
637+
async def delete_by_filter_async(self, filters: Dict[str, Any]) -> int:
638+
"""
639+
Asynchronously deletes all documents that match the provided filters.
640+
641+
:param filters: The filters to apply to select documents for deletion.
642+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
643+
:returns: The number of documents deleted.
644+
"""
645+
self._ensure_initialized()
646+
assert self._async_client is not None
647+
648+
try:
649+
normalized_filters = normalize_filters(filters)
650+
body = {"query": {"bool": {"filter": normalized_filters}}}
651+
result = await self._async_client.delete_by_query(index=self._index, body=body)
652+
deleted_count = result.get("deleted", 0)
653+
logger.info(
654+
"Deleted {n_docs} documents from index '{index}' using filters.",
655+
n_docs=deleted_count,
656+
index=self._index,
657+
)
658+
return deleted_count
659+
except Exception as e:
660+
msg = f"Failed to delete documents by filter from OpenSearch: {e!s}"
661+
raise DocumentStoreError(msg) from e
662+
663+
def update_by_filter(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int:
664+
"""
665+
Updates the metadata of all documents that match the provided filters.
666+
667+
:param filters: The filters to apply to select documents for updating.
668+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
669+
:param meta: The metadata fields to update.
670+
:returns: The number of documents updated.
671+
"""
672+
self._ensure_initialized()
673+
assert self._client is not None
674+
675+
try:
676+
normalized_filters = normalize_filters(filters)
677+
# Build the update script to modify metadata fields
678+
# Documents are stored with flattened metadata, so update fields directly in ctx._source
679+
update_script_lines = []
680+
for key in meta.keys():
681+
update_script_lines.append(f"ctx._source.{key} = params.{key};")
682+
update_script = " ".join(update_script_lines)
683+
684+
body = {
685+
"query": {"bool": {"filter": normalized_filters}},
686+
"script": {"source": update_script, "params": meta, "lang": "painless"},
687+
}
688+
result = self._client.update_by_query(index=self._index, body=body)
689+
updated_count = result.get("updated", 0)
690+
logger.info(
691+
"Updated {n_docs} documents in index '{index}' using filters.",
692+
n_docs=updated_count,
693+
index=self._index,
694+
)
695+
return updated_count
696+
except Exception as e:
697+
msg = f"Failed to update documents by filter in OpenSearch: {e!s}"
698+
raise DocumentStoreError(msg) from e
699+
700+
async def update_by_filter_async(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int:
701+
"""
702+
Asynchronously updates the metadata of all documents that match the provided filters.
703+
704+
:param filters: The filters to apply to select documents for updating.
705+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
706+
:param meta: The metadata fields to update.
707+
:returns: The number of documents updated.
708+
"""
709+
self._ensure_initialized()
710+
assert self._async_client is not None
711+
712+
try:
713+
normalized_filters = normalize_filters(filters)
714+
# Build the update script to modify metadata fields
715+
# Documents are stored with flattened metadata, so update fields directly in ctx._source
716+
update_script_lines = []
717+
for key in meta.keys():
718+
update_script_lines.append(f"ctx._source.{key} = params.{key};")
719+
update_script = " ".join(update_script_lines)
720+
721+
body = {
722+
"query": {"bool": {"filter": normalized_filters}},
723+
"script": {"source": update_script, "params": meta, "lang": "painless"},
724+
}
725+
result = await self._async_client.update_by_query(index=self._index, body=body)
726+
updated_count = result.get("updated", 0)
727+
logger.info(
728+
"Updated {n_docs} documents in index '{index}' using filters.",
729+
n_docs=updated_count,
730+
index=self._index,
731+
)
732+
return updated_count
733+
except Exception as e:
734+
msg = f"Failed to update documents by filter in OpenSearch: {e!s}"
735+
raise DocumentStoreError(msg) from e
736+
611737
def _prepare_bm25_search_request(
612738
self,
613739
*,

integrations/opensearch/tests/test_document_store.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,3 +523,57 @@ def test_delete_all_documents_no_index_recreation(self, document_store: OpenSear
523523
results = document_store.filter_documents()
524524
assert len(results) == 1
525525
assert results[0].content == "New document after delete all"
526+
527+
def test_delete_by_filter(self, document_store: OpenSearchDocumentStore):
528+
docs = [
529+
Document(content="Doc 1", meta={"category": "A"}),
530+
Document(content="Doc 2", meta={"category": "B"}),
531+
Document(content="Doc 3", meta={"category": "A"}),
532+
]
533+
document_store.write_documents(docs)
534+
assert document_store.count_documents() == 3
535+
536+
# Delete documents with category="A"
537+
deleted_count = document_store.delete_by_filter(
538+
filters={"field": "meta.category", "operator": "==", "value": "A"}
539+
)
540+
time.sleep(2) # wait for deletion to be reflected
541+
assert deleted_count == 2
542+
assert document_store.count_documents() == 1
543+
544+
# Verify only category B remains
545+
remaining_docs = document_store.filter_documents()
546+
assert len(remaining_docs) == 1
547+
assert remaining_docs[0].meta["category"] == "B"
548+
549+
def test_update_by_filter(self, document_store: OpenSearchDocumentStore):
550+
docs = [
551+
Document(content="Doc 1", meta={"category": "A", "status": "draft"}),
552+
Document(content="Doc 2", meta={"category": "B", "status": "draft"}),
553+
Document(content="Doc 3", meta={"category": "A", "status": "draft"}),
554+
]
555+
document_store.write_documents(docs)
556+
assert document_store.count_documents() == 3
557+
558+
# Update status for category="A" documents
559+
updated_count = document_store.update_by_filter(
560+
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
561+
)
562+
time.sleep(2) # wait for update to be reflected
563+
assert updated_count == 2
564+
565+
# Verify the updates
566+
published_docs = document_store.filter_documents(
567+
filters={"field": "meta.status", "operator": "==", "value": "published"}
568+
)
569+
assert len(published_docs) == 2
570+
for doc in published_docs:
571+
assert doc.meta["category"] == "A"
572+
assert doc.meta["status"] == "published"
573+
574+
# Verify category B still has draft status
575+
draft_docs = document_store.filter_documents(
576+
filters={"field": "meta.status", "operator": "==", "value": "draft"}
577+
)
578+
assert len(draft_docs) == 1
579+
assert draft_docs[0].meta["category"] == "B"

integrations/opensearch/tests/test_document_store_async.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,3 +302,57 @@ async def test_delete_all_documents_no_index_recreation(self, document_store: Op
302302
results = await document_store.filter_documents_async()
303303
assert len(results) == 1
304304
assert results[0].content == "New document after delete all"
305+
306+
async def test_delete_by_filter_async(self, document_store: OpenSearchDocumentStore):
307+
docs = [
308+
Document(content="Doc 1", meta={"category": "A"}),
309+
Document(content="Doc 2", meta={"category": "B"}),
310+
Document(content="Doc 3", meta={"category": "A"}),
311+
]
312+
await document_store.write_documents_async(docs)
313+
assert await document_store.count_documents_async() == 3
314+
315+
# Delete documents with category="A"
316+
deleted_count = await document_store.delete_by_filter_async(
317+
filters={"field": "meta.category", "operator": "==", "value": "A"}
318+
)
319+
time.sleep(2) # wait for deletion to be reflected
320+
assert deleted_count == 2
321+
assert await document_store.count_documents_async() == 1
322+
323+
# Verify only category B remains
324+
remaining_docs = await document_store.filter_documents_async()
325+
assert len(remaining_docs) == 1
326+
assert remaining_docs[0].meta["category"] == "B"
327+
328+
async def test_update_by_filter_async(self, document_store: OpenSearchDocumentStore):
329+
docs = [
330+
Document(content="Doc 1", meta={"category": "A", "status": "draft"}),
331+
Document(content="Doc 2", meta={"category": "B", "status": "draft"}),
332+
Document(content="Doc 3", meta={"category": "A", "status": "draft"}),
333+
]
334+
await document_store.write_documents_async(docs)
335+
assert await document_store.count_documents_async() == 3
336+
337+
# Update status for category="A" documents
338+
updated_count = await document_store.update_by_filter_async(
339+
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
340+
)
341+
time.sleep(2) # wait for update to be reflected
342+
assert updated_count == 2
343+
344+
# Verify the updates
345+
published_docs = await document_store.filter_documents_async(
346+
filters={"field": "meta.status", "operator": "==", "value": "published"}
347+
)
348+
assert len(published_docs) == 2
349+
for doc in published_docs:
350+
assert doc.meta["category"] == "A"
351+
assert doc.meta["status"] == "published"
352+
353+
# Verify category B still has draft status
354+
draft_docs = await document_store.filter_documents_async(
355+
filters={"field": "meta.status", "operator": "==", "value": "draft"}
356+
)
357+
assert len(draft_docs) == 1
358+
assert draft_docs[0].meta["category"] == "B"

0 commit comments

Comments
 (0)