Skip to content

Add Automated Monitoring and Alerting Guide #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions guides/automated_monitoring_alerting/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ZEROENTROPY_API_KEY = "your api key"
SLACK_WEBHOOK_URL="your webhook url"
49 changes: 49 additions & 0 deletions guides/automated_monitoring_alerting/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Automated Monitoring & Alerting with ZeroEntropy

This guide demonstrates how to set up an automated workflow that monitors a folder for new documents, indexes them with ZeroEntropy, and sends an alert when new, relevant content is detected based on semantic queries.

## Overview

- **Monitor** a folder for new or updated files (PDF, TXT, CSV, etc.)
- **Index** new documents automatically using the ZeroEntropy API
- **Query** the new content for matches to a set of important topics or keywords
- **Alert** your team via email or Slack if relevant content is found

Some use cases for this could be:
- Keeping teams up-to-date with new, relevant information
- Automating knowledge discovery in shared drives or document repositories
- Surfacing important updates as soon as they appear

## Prerequisites

- Python 3.8+
- ZeroEntropy API key ([Get yours here](https://dashboard.zeroentropy.dev))
- `zeroentropy`, `python-dotenv`, `watchdog`, and `requests` Python packages
- (Optional) Email or Slack webhook for alerts

## Setup Instructions

1. **Install dependencies:**
```bash
pip install zeroentropy python-dotenv watchdog requests
```
2. **Create a `.env` file** with your ZeroEntropy API key:
```bash
ZEROENTROPY_API_KEY=your_api_key_here
```
3. **Configure your alerting method** in the script (email or Slack)
4. **Run the script:**
```bash
python monitor_and_alert.py
```

## How it Works

- The script watches a specified folder for new or changed files.
- When a new file is detected, it is indexed with ZeroEntropy.
- The script runs a set of semantic queries against the new content.
- If any query returns a relevant result, an alert is sent to your team.

---

See `monitor_and_alert.py` for the implementation and configuration options.
92 changes: 92 additions & 0 deletions guides/automated_monitoring_alerting/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from zeroentropy import AsyncZeroEntropy, ConflictError
import asyncio
from dotenv import load_dotenv
from tqdm.asyncio import tqdm
import os
import base64

load_dotenv()

zclient = AsyncZeroEntropy()
sem = asyncio.Semaphore(16)

async def index_document(document_path: str, collection_name: str) -> None:
response = None
if not os.path.exists(document_path):
raise FileNotFoundError(f"File {document_path} not found")

# for csv we will index each row as a separate document
if os.path.splitext(document_path)[1] == ".csv":
with open(document_path) as f:
documents = f.readlines()
for i, document in enumerate(documents):
async with sem:
for _retry in range(3):
try:
content = { "type": "text", "text": document }
response = await zclient.documents.add(
collection_name=collection_name,
path=f"{document_path}_{i}",
content=content,
metadata={"type": "csv"},
)
break
except ConflictError as e:
print(f"Document '{document_path}' already exists in collection '{collection_name}'")
# for pdf we need to specifify the type so we can use OCR
elif os.path.splitext(document_path)[1] == ".pdf":
with open(document_path, "rb") as f:
pdf_bytes = f.read()
pdf_base64 = base64.b64encode(pdf_bytes).decode("utf-8")
async with sem:
for _retry in range(3):
try:
content = { "type": "auto", "base64_data": pdf_base64 } #this will automatically OCR the PDF
response = await zclient.documents.add(
collection_name=collection_name,
path=document_path,
content=content,
metadata={"type": "pdf"},
)
break
except ConflictError as e:
print(f"Document '{document_path}' already exists in collection '{collection_name}'")
#for txt no need to use OCR
elif os.path.splitext(document_path)[1] == ".txt":
with open(document_path, "r", encoding="utf-8") as f:
text = f.read()
async with sem:
for _retry in range(3):
try:
content = { "type": "text", "text": text }
response = await zclient.documents.add(
collection_name=collection_name,
path=document_path,
content=content,
metadata={"type": "text"},
)
break
except ConflictError as e:
print(f"Document '{document_path}' already exists in collection '{collection_name}'")
else:
print(f"Unsupported file type: {os.path.splitext(document_path)[1]}")
return None
return response

async def main():
#list all files in the data folder

DATA_DIR = "./data"
COLLECTION_NAME = "default"

documents_path = [os.path.join(DATA_DIR, file) for file in os.listdir(DATA_DIR)]
try:
await zclient.collections.add(collection_name=COLLECTION_NAME)
except ConflictError:
print(f"Collection '{COLLECTION_NAME}' already exists")
print(f"Indexing {len(documents_path)} documents in collection '{COLLECTION_NAME}'")
await tqdm.gather(*[index_document(document_path, COLLECTION_NAME) for document_path in documents_path], desc="Indexing Documents")
print(f"Indexing completed for collection '{COLLECTION_NAME}'")

if __name__ == "__main__":
asyncio.run(main())
94 changes: 94 additions & 0 deletions guides/automated_monitoring_alerting/monitor_and_alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import os
import time
import threading
import asyncio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from zeroentropy import ZeroEntropy
from slack_sdk.webhook import WebhookClient
from index import index_document
from query import query_collection
from dotenv import load_dotenv
load_dotenv()


# --- Configuration ---
FOLDER_TO_WATCH = './example' # Folder to monitor
COLLECTION_NAME = 'example'
# Add queries depending on the use case
QUERIES = [
'security vulnerability'
]
# For now the implementation supports slack alerts.
ALERT_METHOD = 'slack'

SLACK_WEBHOOK_URL = os.getenv('SLACK_WEBHOOK_URL')
API_KEY = os.getenv('ZEROENTROPY_API_KEY')



zclient = ZeroEntropy(api_key=API_KEY)

# sending alert via slack
def send_slack_alert(message):
webhook = WebhookClient(SLACK_WEBHOOK_URL)
response = webhook.send(text=message)
if response.status_code != 200:
print(f"Failed to send Slack alert: {response.status_code}")
else:
print(f"Slack alert sent successfully: {response.status_code}")


# Indexing and querying the documents and sending alerts if relevant content is found.
def index_and_alert(filepath):
ext = os.path.splitext(filepath)[1].lower()
if ext not in ['.txt', '.pdf', '.csv']:
print(f'Skipping unsupported file type: {filepath}')
return

async def async_index_and_query():
try:
await index_document(filepath, COLLECTION_NAME)
print(f'Indexed: {filepath}')
for query in QUERIES:
results = await query_collection(COLLECTION_NAME, query, top_k_csv=1, top_k_txt=1)
if results and hasattr(results[0], 'score') and results[0].score > 0.2:
alert_msg = f'Relevant content found for query "{query}" in file {filepath}.'
send_slack_alert(alert_msg)
except Exception as e:
print(f'Error indexing or querying {filepath}: {e}')

# Run the async function in a new event loop in a thread
threading.Thread(target=lambda: asyncio.run(async_index_and_query())).start()

# Watchdog handler to monitor the folder for new files and modified files.
class NewFileHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
threading.Thread(target=index_and_alert, args=(event.src_path,)).start()
def on_modified(self, event):
if not event.is_directory:
threading.Thread(target=index_and_alert, args=(event.src_path,)).start()

def main():
# check if collection exists and create it if it doesn't
try:
zclient.collections.add(collection_name=COLLECTION_NAME)
except Exception:
pass

# Start monitoring
event_handler = NewFileHandler()
observer = Observer()
observer.schedule(event_handler, FOLDER_TO_WATCH, recursive=False)
observer.start()
print(f'Watching folder: {FOLDER_TO_WATCH}')
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()

if __name__ == '__main__':
main()
53 changes: 53 additions & 0 deletions guides/automated_monitoring_alerting/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from zeroentropy import AsyncZeroEntropy
import asyncio
from dotenv import load_dotenv

load_dotenv()

zclient = AsyncZeroEntropy()

async def query_collection(collection_name: str, query: str, top_k_csv: int = 5, top_k_txt: int = 10) -> None:
# get the top 5 rows of the csv
response_csv = await zclient.queries.top_documents(collection_name=collection_name,
k=top_k_csv,
query=query,
filter={
"type":
{"$eq":"csv"}
}
)
response_txt = await zclient.queries.top_snippets(collection_name=collection_name,
k=top_k_txt,
query=query,
precise_responses = True, # this controls the length of the snippets (around 200 chars or 2000 chars more or less)
filter={
"type":
{"$ne":"csv"}
}
)
# get the content of the documents csv (not included in the response for top documents)
final_response = []
for result in response_csv.results:
document_content = await zclient.documents.get_info(collection_name=collection_name, path=result.path, include_content=True)
response = {
"path": result.path,
"content": document_content.document.content,
"score": result.score,
"metadata": result.metadata
}
final_response.append(response)

# combine the response with the snippets
return final_response + response_txt.results

async def main():
COLLECTION_NAME = "default"
query = "This is a test query"
response = await query_collection(COLLECTION_NAME, query, top_k_csv=5, top_k_txt=10)
for i, result in enumerate(response):
print(f"Result {i+1}:")
print(result)
print("\n")

if __name__ == "__main__":
asyncio.run(main())