diff --git a/.gitignore b/.gitignore index e7c8dee..9f19a22 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # Python Exclusions .venv +.venv* **__pycache__** # Helm Exclusions diff --git a/docs/quickstart.md b/docs/quickstart.md index bc4f53c..003bb3d 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -12,20 +12,21 @@ Use the following documentation to get started with the NVIDIA RAG Blueprint. - [Deploy With Helm Chart](#deploy-with-helm-chart) - [Data Ingestion](#data-ingestion) +## Obtain API Keys -## Obtain an API Key +### Obtain NVIDIA API Key You need to obtain a single API key for accessing NIM services, to pull models on-prem, or to access models hosted in the NVIDIA API Catalog. Use one of the following methods to generate an API key: - - Option 1: Sign in to the [NVIDIA Build](https://build.nvidia.com/explore/discover?signin=true) portal with your email. - - Click any [model](https://build.nvidia.com/meta/llama-3_1-70b-instruct), then click **Get API Key**, and finally click **Generate Key**. +- Option 1: Sign in to the [NVIDIA Build](https://build.nvidia.com/explore/discover?signin=true) portal with your email. + - Click any [model](https://build.nvidia.com/meta/llama-3_1-70b-instruct), then click **Get API Key**, and finally click **Generate Key**. - - Option 2: Sign in to the [NVIDIA NGC](https://ngc.nvidia.com/) portal with your email. - - Select your organization from the dropdown menu after logging in. You must select an organization which has NVIDIA AI Enterprise (NVAIE) enabled. - - Click your account in the top right, and then select **Setup**. - - Click **Generate Personal Key**, and then click **+ Generate Personal Key** to create your API key. - - Later, you use this key in the `NVIDIA_API_KEY` environment variables. +- Option 2: Sign in to the [NVIDIA NGC](https://ngc.nvidia.com/) portal with your email. + - Select your organization from the dropdown menu after logging in. You must select an organization which has NVIDIA AI Enterprise (NVAIE) enabled. + - Click your account in the top right, and then select **Setup**. + - Click **Generate Personal Key**, and then click **+ Generate Personal Key** to create your API key. + - Later, you use this key in the `NVIDIA_API_KEY` environment variables. Finally export your NVIDIA API key as an environment variable. @@ -33,6 +34,13 @@ Finally export your NVIDIA API key as an environment variable. export NVIDIA_API_KEY="nvapi-..." ``` +### Obtain Pinecone API Key + +You will also need a Pinecone API key to make use of the vector database. You can do by signing into the [Pinecone console](https://app.pinecone.io). + +- Select the Organization and Project you are using and click "API Keys" on the left side of the screen. +- Click "Create API Key" +- Be sure to save the key once created as it will not be displayed again later. ## Deploy With Docker Compose @@ -66,7 +74,6 @@ For both retrieval and ingestion services, by default all the models are deploye 5. Ensure you meet [the hardware requirements if you are deploying models on-prem](../README.md#hardware-requirements). - ### Start using on-prem models Use the following procedure to start all containers needed for this blueprint. This launches the ingestion services followed by the rag services and all of its dependent NIMs on-prem. @@ -223,7 +230,6 @@ If the NIMs are deployed in a different workstation or outside the nvidia-rag do NEXT_PUBLIC_VDB_BASE_URL: "http://ingestor-server:8082/v1" ``` - ### Start using nvidia hosted models 1. Verify that you meet the [prerequisites](#prerequisites). @@ -305,12 +311,10 @@ If the NIMs are deployed in a different workstation or outside the nvidia-rag do 7. Open a web browser and access `http://localhost:8090` to use the RAG Playground. You can use the upload tab to ingest files into the server or follow [the notebooks](../notebooks/) to understand the API usage. - ## Deploy With Helm Chart Use these procedures to deploy with Helm Chart to deploy on a Kubernetes cluster. Alternatively, you can [Deploy With Docker Compose](#deploy-with-docker-compose) for a single node deployment. - ### Prerequisites - Verify that you meet the [prerequisites](#prerequisites). @@ -463,7 +467,6 @@ rag-zipkin-5dc8d6d977-nqvvc 1/1 Running 0 kubectl get events -n rag ``` - ##### List Services ```sh kubectl get svc -n rag @@ -702,7 +705,7 @@ To use a custom Milvus endpoint, you need to update the `APP_VECTORSTORE_URL` en If you have a plan to customize the RAG server deployment like LLM Model Change then please follow the steps to deploy the Frontend - - Build the new docker image with updated model name from docker compose +- Build the new docker image with updated model name from docker compose ``` cd ../deploy/compose @@ -748,7 +751,7 @@ To use a custom Milvus endpoint, you need to update the `APP_VECTORSTORE_URL` en Once docker image has been build to push the image to a docker a registry - - Run the following command to install the RAG server with the Ingestor Server and New Frontend with updated `` and ``: +- Run the following command to install the RAG server with the Ingestor Server and New Frontend with updated `` and ``: ```sh helm install rag -n rag rag-server/ \ @@ -767,7 +770,6 @@ For troubleshooting issues with Helm deployment, checkout the troubleshooting se [!IMPORTANT] Before you can use this procedure, you must deploy the blueprint by using [Deploy With Docker Compose](#deploy-with-docker-compose) or [Deploy With Helm Chart](#deploy-with-helm-chart). - 1. Download and install Git LFS by following the [installation instructions](https://git-lfs.com/). 2. Initialize Git LFS in your environment. @@ -798,8 +800,6 @@ Before you can use this procedure, you must deploy the blueprint by using [Deplo Follow the cells in the notebook to ingest the PDF files from the data/dataset folder into the vector store. - - ## Next Steps - [Change the Inference or Embedding Model](change-model.md) diff --git a/notebooks/ingestion_api_usage.ipynb b/notebooks/ingestion_api_usage.ipynb index db2f6b5..fe1a224 100644 --- a/notebooks/ingestion_api_usage.ipynb +++ b/notebooks/ingestion_api_usage.ipynb @@ -5,9 +5,11 @@ "id": "650fa9f0", "metadata": {}, "source": [ - "# Ingestion API Usage\n", + "# Ingestion API Usage (Pinecone)\n", "\n", - "This notebook demonstrates how to interact with the ingestion APIs to upload and index documents for retrieval-augmented generation (RAG) applications. It showcases the different APIs needed to create a collection, upload documents to the created collection using Milvus Vector DB. It also showcases different APIs to manage uploaded documents and existing collections effectively." + "This notebook demonstrates how to interact with the ingestion APIs to upload and index documents for retrieval-augmented generation (RAG) applications. It showcases the different APIs needed to create a collection (namespace), upload documents to the created collection using **Pinecone Vector DB**, and manage uploaded documents and existing collections effectively.\n", + "\n", + "**Note:** The backend must be configured to use Pinecone as the vector database. The term \"collection\" in the API maps to a \"namespace\" in Pinecone." ] }, { @@ -15,11 +17,10 @@ "id": "a5726313-f5ab-48fb-b747-c790ebaafe48", "metadata": {}, "source": [ - "\n", - "\n", - "- Ensure the ingestor-server container is running before executing the notebook by [following steps in the readme](../docs/quickstart.md#start-the-containers-for-ingestion-microservices).\n", + "- Ensure the ingestor-server container is running and configured for Pinecone before executing the notebook by [following steps in the readme](../docs/quickstart.md#start-the-containers-for-ingestion-microservices).\n", "- Replace `BASE_URL` with the actual server URL if the API is hosted on another system.\n", - "- You can customize the directory path (`../data/multimodal`) with the correct location of your dataset.\n" + "- You can customize the directory path (`../data/multimodal`) with the correct location of your dataset.\n", + "- If your backend requires Pinecone credentials as environment variables, set them below." ] }, { @@ -27,7 +28,7 @@ "id": "e58505d5-5436-449a-b316-b943a1a57797", "metadata": {}, "source": [ - "#### 1. Install Dependencies and import required modules" + "#### 1. Install Dependencies, Import Modules, and Set Pinecone Credentials" ] }, { @@ -37,10 +38,19 @@ "metadata": {}, "outputs": [], "source": [ - "!pip install aiohttp\n", + "!pip install aiohttp pinecone\n", "import aiohttp\n", "import os\n", - "import json" + "import json\n", + "import getpass\n", + "\n", + "# Prompt for Pinecone API key and environment if not set\n", + "if not os.getenv(\"PINECONE_API_KEY\"):\n", + " os.environ[\"PINECONE_API_KEY\"] = getpass.getpass(\"Enter your Pinecone API key: \")\n", + "if not os.getenv(\"PINECONE_REGION\"):\n", + " os.environ[\"PINECONE_REGION\"] = input(\"Enter your Pinecone region (e.g., 'us-east-1'): \")\n", + "if not os.getenv(\"PINECONE_CLOUD\"):\n", + " os.environ[\"PINECONE_CLOUD\"] = input(\"Enter your Pinecone cloud (e.g., 'aws'): \")\n" ] }, { @@ -58,10 +68,12 @@ "metadata": {}, "outputs": [], "source": [ - "IPADDRESS = \"localhost\" #Replace this with the correct IP address\n", + "IPADDRESS = \"localhost\" # Replace this with the correct IP address\n", "INGESTOR_SERVER_PORT = \"8082\"\n", "BASE_URL = f\"http://{IPADDRESS}:{INGESTOR_SERVER_PORT}\" # Replace with your server URL\n", "\n", + "# Note: The backend must be configured to use Pinecone. The API endpoints are backend-agnostic.\n", + "\n", "async def print_response(response):\n", " \"\"\"Helper to print API response.\"\"\"\n", " try:\n", @@ -105,10 +117,10 @@ "id": "2850cbb2", "metadata": {}, "source": [ - "#### 4. Create collection Endpoint\n", + "#### 4. Create Index Endpoint\n", "\n", "**Purpose:**\n", - "This endpoint is used to create a collection in the vector store. " + "This endpoint is used to create a index in the Pinecone vector store." ] }, { @@ -118,29 +130,47 @@ "metadata": {}, "outputs": [], "source": [ - "async def create_collections(\n", - " collection_names: list = None,\n", - " collection_type: str = \"text\",\n", - " embedding_dimension: int = 2048\n", - "):\n", + "from pinecone import Pinecone, ServerlessSpec\n", "\n", - " params = {\n", - " \"collection_type\": collection_type,\n", - " \"embedding_dimension\": embedding_dimension\n", - " }\n", - "\n", - " HEADERS = {\"Content-Type\": \"application/json\"}\n", "\n", - " async with aiohttp.ClientSession() as session:\n", - " try:\n", - " async with session.post(f\"{BASE_URL}/v1/collections\", params=params, json=collection_names, headers=HEADERS) as response:\n", - " await print_response(response)\n", - " except aiohttp.ClientError as e:\n", - " return 500, {\"error\": str(e)}\n", - "\n", - "await create_collections(collection_names=[\"multimodal_data\"])" + "async def create_index(\n", + " index_name: str,\n", + " dimension: int,\n", + " spec: ServerlessSpec,\n", + " metric: str = \"cosine\"\n", + "):\n", + " pc = Pinecone(\n", + " api_key=os.getenv(\"PINECONE_API_KEY\"),\n", + " source_tag=\"nvidia-rag-blueprint\"\n", + " )\n", + " index_name = os.getenv(\"PINECONE_INDEX_NAME\") or \"nvidia-rag-index\"\n", + " if not pc.index_exists(index_name):\n", + " pc.create_index(\n", + " name=index_name,\n", + " dimension=dimension,\n", + " metric=metric,\n", + " spec=spec\n", + " )\n", + " \n", + "await create_index(\n", + " index_name=\"nvidia-pinecone-rag\",\n", + " dimension=1024,\n", + " metric=\"cosine\",\n", + " spec=ServerlessSpec(\n", + " cloud=os.getenv(\"PINECONE_CLOUD\") or \"aws\",\n", + " region=os.getenv(\"PINECONE_REGION\") or \"us-west-2\"\n", + " )\n", + ")" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "7b9d434b", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "markdown", "id": "2334abfc-5832-4e39-8793-818b5265aa1d", @@ -150,9 +180,9 @@ "\n", "**Purpose:**\n", "This endpoint uploads new documents to the vector store. \n", - "1. You can specify the collection name where the documents should be stored. \n", - "2. The collection to which the documents are being uploaded must exist in the vector database.\n", - "3. The documents which are uploaded must not exist in the collection. If the documents already exists, to reingest existing files in the provided collection, replace `session.post(...)` with `session.patch(...)`\n", + "1. You can specify the index name and namespace where the documents should be stored. \n", + "2. The index to which the documents are being uploaded must exist in the vector database.\n", + "3. The documents which are uploaded must not exist in the index. If the documents already exists, to reingest existing files in the provided index, replace `session.post(...)` with `session.patch(...)`\n", "4. To speed up the ingestion process, the multiple files can be passed in a single request as showcased below." ] }, @@ -168,7 +198,8 @@ " files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR) if os.path.isfile(os.path.join(DATA_DIR, f))]\n", "\n", " data = {\n", - " \"collection_name\": collection_name,\n", + " \"index_name\": index_name,\n", + " \"namespace\": namespace,\n", " \"extraction_options\": {\n", " \"extract_text\": True,\n", " \"extract_tables\": True,\n", @@ -196,7 +227,7 @@ " except aiohttp.ClientError as e:\n", " print(f\"Error: {e}\")\n", "\n", - "await upload_documents(collection_name=\"multimodal_data\")\n" + "await upload_documents(index_name=\"nvidia-rag-index\", namespace=\"multimodal_data\")\n" ] }, { @@ -219,17 +250,46 @@ }, "outputs": [], "source": [ - "async def fetch_documents(collection_name: str = \"\"):\n", - " url = f\"{BASE_URL}/v1/documents\"\n", - " params = {\"collection_name\": collection_name}\n", - " async with aiohttp.ClientSession() as session:\n", - " try:\n", - " async with session.get(url, params=params) as response:\n", - " await print_response(response)\n", - " except aiohttp.ClientError as e:\n", - " print(f\"Error: {e}\")\n", + "pc = Pinecone(\n", + " api_key=os.getenv(\"PINECONE_API_KEY\"),\n", + " source_tag=\"nvidia-rag-blueprint\"\n", + ")\n", + "\n", + "def fetch_documents_with_pagination(index_name: str, namespace: str = \"\", page_size: int = 100):\n", + " \"\"\"\n", + " Fetches all vector documents from a Pinecone namespace (collection) using pagination.\n", + " Prints progress and outputs all documents as a JSON array.\n", + " \"\"\"\n", + " pagination_token = None\n", + " all_documents = []\n", + " total = None\n", + " fetched_count = None\n", + "\n", + " index = pc.Index(index_name)\n", + " total = index.describe_index_stats(namespace=namespace).total_vector_count\n", + "\n", + " while True:\n", + " result = index.list_paginated(\n", + " namespace=namespace,\n", + " limit=page_size,\n", + " pagination_token=pagination_token\n", + " )\n", + " vectors = result.get('vectors', [])\n", + " all_documents.extend(vectors)\n", + " if fetched_count is None:\n", + " # Pinecone does not return total count, so we estimate as we go\n", + " fetched_count = len(vectors)\n", + " print(f\"Fetched {fetched_count} / {total} documents\")\n", + " pagination = result.get('pagination', {})\n", + " pagination_token = pagination.get('next')\n", + " if not pagination_token:\n", + " break\n", + "\n", + " import json\n", + " print(json.dumps(all_documents, indent=2))\n", "\n", - "await fetch_documents(collection_name=\"multimodal_data\")" + "# Example usage:\n", + "# fetch_documents_with_pagination(index, namespace=\"default\", page_size=100)" ] }, { @@ -240,7 +300,7 @@ "#### 6. Delete Documents Endpoint\n", "\n", "**Purpose:**\n", - "This endpoint deletes specified documents from the vector store. The documents are identified by its filename." + "This endpoint deletes specified documents from the vector store. The documents are identified by their vector IDs (which should be the same as the original filename)." ] }, { @@ -252,17 +312,16 @@ "source": [ "from typing import List\n", "\n", - "async def delete_documents(collection_name: str = \"\", file_names: List[str] = []):\n", - " url = f\"{BASE_URL}/v1/documents\"\n", - " params = {\"collection_name\": collection_name}\n", - " async with aiohttp.ClientSession() as session:\n", - " try:\n", - " async with session.delete(url, params=params, json=file_names) as response:\n", - " await print_response(response)\n", - " except aiohttp.ClientError as e:\n", - " print(f\"Error: {e}\")\n", + "async def delete_documents(index: str, namespace: str = \"\", ids: List[str] = []):\n", + " index = pc.Index(index)\n", "\n", - "await delete_documents(collection_name=\"multimodal_data\", file_names=[\"embedded_table.pdf\", \"table_test.pdf\"])" + " # If no file names are provided, delete all documents in the namespace\n", + " if len(ids) == 0:\n", + " index.delete(namespace=namespace, delete_all=True)\n", + " else:\n", + " index.delete(namespace=namespace, ids=ids)\n", + "\n", + "await delete_documents(index=\"nvidia-rag-index\", namespace=\"multimodal_data\", ids=[\"embedded_table.pdf\", \"table_test.pdf\"])" ] }, { @@ -270,10 +329,10 @@ "id": "3ec9bfd5-b943-4eed-874b-9067fdfe06ca", "metadata": {}, "source": [ - "#### 7. Get Collections Endpoint\n", + "#### 7. Get Namespaces Endpoint\n", "\n", "**Purpose:**\n", - "This endpoint retrieves a list of all collection names available on the server. Collections are used to organize documents in the vector store." + "This endpoint retrieves a list of all namespaces available on the server. Namespaces are used to organize documents in the vector store." ] }, { @@ -283,16 +342,12 @@ "metadata": {}, "outputs": [], "source": [ - "async def fetch_collections():\n", - " url = f\"{BASE_URL}/v1/collections\"\n", - " async with aiohttp.ClientSession() as session:\n", - " try:\n", - " async with session.get(url) as response:\n", - " await print_response(response)\n", - " except aiohttp.ClientError as e:\n", - " print(f\"Error: {e}\")\n", + "async def fetch_namespaces(index: str):\n", + " index = pc.Index(index)\n", + " namespaces = index.describe_index_stats().namespaces\n", + " return namespaces\n", "\n", - "await fetch_collections()" + "await fetch_namespaces(index=\"nvidia-rag-index\")" ] }, { @@ -300,10 +355,10 @@ "id": "0b97e846", "metadata": {}, "source": [ - "#### 7. Delete Collections Endpoint\n", + "#### 7. Delete Namespaces Endpoint\n", "\n", "**Purpose:**\n", - "This endpoint deletes list of provided collection names available on the specified vector database server." + "This endpoint deletes list of provided namespaces available on the specified vector database server." ] }, { @@ -315,16 +370,13 @@ "source": [ "from typing import List\n", "\n", - "async def delete_collections(collection_names: List[str] = \"\"):\n", - " url = f\"{BASE_URL}/v1/collections\"\n", - " async with aiohttp.ClientSession() as session:\n", - " try:\n", - " async with session.delete(url, json=collection_names) as response:\n", - " await print_response(response)\n", - " except aiohttp.ClientError as e:\n", - " print(f\"Error: {e}\")\n", + "async def delete_namespaces(index: str, namespaces: List[str] = []):\n", + " index = pc.Index(index)\n", + " for namespace in namespaces:\n", + " index.delete(namespace=namespace, delete_all=True)\n", + " return True\n", "\n", - "await delete_collections(collection_names=[\"multimodal_data\"])" + "await delete_namespaces(index=\"nvidia-rag-index\", namespaces=[\"multimodal_data\"])" ] } ], diff --git a/notebooks/launchable.ipynb b/notebooks/launchable.ipynb index 8b28d29..0b9ca0b 100644 --- a/notebooks/launchable.ipynb +++ b/notebooks/launchable.ipynb @@ -255,7 +255,7 @@ "metadata": {}, "outputs": [], "source": [ - "!git clone --branch v2.0.0 --depth 1 https://github.com/NVIDIA-AI-Blueprints/rag.git" + "!git clone --branch v2.0.0 --depth 1 https://github.com/pinecone-io/nvidia-pinecone-rag.git" ] }, { @@ -274,14 +274,16 @@ "metadata": {}, "outputs": [], "source": [ - "!pip install aiohttp" + "!pip install pinecone" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### 2.3 Getting your API key\n", + "### 2.3 Getting your API keys\n", + "\n", + "### 2.3.1 NVIDIA API key\n", "\n", "1. Sign in to the [NVIDIA Build](https://build.nvidia.com/explore/discover?signin=true) portal with your email\n", "\n", @@ -293,6 +295,17 @@ "\n", "5. Copy your key (it should start with `nvapi-`)\n", "\n", + "### 2.3.2 Pinecone API key\n", + "\n", + "1. Sign into the [Pinecone console](https://app.pinecone.io)\n", + "\n", + "2. Click API keys on the left side.\n", + "\n", + "3. Click Create New API Key.\n", + "\n", + "4. Copy and store the new key in a safe location; it will not be displayed again after creation.\n", + "\n", + "\n", "#### Setting Environment Variables\n", "\n", "This notebook requires certain environment variables to be configured in a `.env` file. We'll create this file automatically in the next step.\n", @@ -315,6 +328,12 @@ " os.environ[\"NVIDIA_API_KEY\"] = nvapi_key\n", " print(\"✅ API key has been set in notebook environment\")\n", "\n", + "if not os.environ.get(\"PINECONE_API_KEY\", \"\"):\n", + " pinecone_key = getpass.getpass(\"Enter your Pinecone API key: \")\n", + " assert pinecone_key.startswith(\"pcsk_\"), f\"{pinecone_key[:5]}... is not a valid key\"\n", + " os.environ[\"PINECONE_API_KEY\"] = pinecone_key\n", + " print(\"✅ API key has been set in notebook environment\")\n", + "\n", "# Step 2: Save API key to .env file\n", "env_dir = os.path.join(os.getcwd(), 'deploy/compose')\n", "print(env_dir)\n", @@ -329,7 +348,8 @@ "# Write the new .env file\n", "with open(env_path, 'w') as f:\n", " f.write(f'NVIDIA_API_KEY={os.environ[\"NVIDIA_API_KEY\"]}\\n')\n", - "print(\"✅ API key has been saved to .env file\")" + " f.write(f'PINECONE_API_KEY={os.environ[\"PINECONE_API_KEY\"]}\\n')\n", + "print(\"✅ API keys have been saved to .env file\")" ] }, { @@ -481,18 +501,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### 2.7 Start the vector database containers\n", - "Next, we'll start the vector database containers which will store and retrieve document embeddings:\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%%capture\n", - "!docker compose -f deploy/compose/vectordb.yaml pull --quiet" + "### 2.7 Connect to Pinecone\n", + "\n", + "Now, we'll get our connection to the Pinecone vector store.\n" ] }, { @@ -501,7 +512,11 @@ "metadata": {}, "outputs": [], "source": [ - "!docker compose -f deploy/compose/vectordb.yaml up -d" + "from pinecone import Pinecone, ServerlessSpec\n", + "\n", + "pc = Pinecone(api_key=os.getenv(\"PINECONE_API_KEY\"))\n", + "\n", + "index = pc.Index(\"multimodal-data\")" ] }, { @@ -809,9 +824,9 @@ "source": [ "#### 3.2.2 Vector DB APIs Usage\n", "\n", - "##### 3.2.2.1 Create collection Endpoint\n", + "##### 3.2.2.1 Create index Endpoint\n", "**Purpose:**\n", - "This endpoint is used to create a collection in the vector store. " + "This endpoint is used to create an index in the vector store. " ] }, { @@ -820,52 +835,48 @@ "metadata": {}, "outputs": [], "source": [ - "async def create_collections(\n", - " collection_names: list = None,\n", - " collection_type: str = \"text\", \n", - " embedding_dimension: int = 2048\n", + "async def create_index(\n", + " index_name: str = \"multimodal-data\",\n", + " dimension: int = 1024,\n", + " metric: str = \"cosine\",\n", + " spec: ServerlessSpec = ServerlessSpec(\n", + " cloud=\"aws\",\n", + " region=\"us-east-1\"\n", + " )\n", "):\n", - " \"\"\"Create one or more collections in the vector store.\n", + " \"\"\"Create a Pinecone index.\n", " \n", " Args:\n", - " collection_names (list): List of collection names to create\n", - " collection_type (str): Type of collection, defaults to \"text\"\n", - " embedding_dimension (int): Dimension of embeddings, defaults to 2048\n", + " index_name (str): Name of the index to create\n", + " dimension (int): Dimension of the index\n", + " metric (str): Metric of the index\n", + " spec (ServerlessSpec): Specification of the index\n", " \n", " Returns:\n", " Response from the API endpoint or error details if request fails\n", " \"\"\"\n", - " # Parameters for creating collections\n", - " params = {\n", - " \"vdb_endpoint\": \"http://milvus:19530\", # Milvus vector DB endpoint\n", - " \"collection_type\": collection_type, # Type of collection\n", - " \"embedding_dimension\": embedding_dimension # Dimension of embeddings\n", - " }\n", + " pc = Pinecone(api_key=os.getenv(\"PINECONE_API_KEY\"))\n", + " pc.create_index(index_name, dimension, metric, spec)\n", "\n", - " HEADERS = {\"Content-Type\": \"application/json\"}\n", "\n", - " # Make API request to create collections\n", - " async with aiohttp.ClientSession() as session:\n", - " try:\n", - " async with session.post(f\"{INGESTOR_BASE_URL}/v1/collections\", \n", - " params=params,\n", - " json=collection_names, \n", - " headers=HEADERS) as response:\n", - " await print_raw_response(response)\n", - " except aiohttp.ClientError as e:\n", - " return 500, {\"error\": str(e)}\n", "\n", "# Create a collection named \"multimodal_data\" \n", - "await create_collections(collection_names=[\"multimodal_data\"])" + "await create_index(index_name=\"multimodal-data\", \n", + " dimension=1024, \n", + " metric=\"cosine\", \n", + " spec=ServerlessSpec(\n", + " cloud=\"aws\",\n", + " region=\"us-east-1\"\n", + " ))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "##### 3.2.2.2 Get collections Endpoint\n", + "##### 3.2.2.2 Get namespaces Endpoint\n", "**Purpose:**\n", - "This endpoint is used to get a list of collection names from the Milvus server. Returns a list of collection names.\n", + "This endpoint is used to get a list of namespaces from the Pinecone index. Returns a list of namespaces.\n", "\n" ] }, @@ -875,40 +886,33 @@ "metadata": {}, "outputs": [], "source": [ - "# First let's create another collection\n", - "await create_collections(collection_names=[\"multimodal_data1\"])\n", - "\n", - "# Now let's get the list of collections\n", - "async def fetch_collections():\n", - " \"\"\"Retrieve a list of all collections from the Milvus vector database.\n", + "# Now let's get the list of namespaces\n", + "async def fetch_namespaces():\n", + " \"\"\"Retrieve a list of all namespaces from the Pinecone index.\n", " \n", - " Makes a GET request to the ingestor API endpoint to fetch all collection names\n", - " from the specified Milvus server.\n", + " Makes a GET request to the ingestor API endpoint to fetch all namespace names\n", + " from the specified Pinecone index.\n", " \n", " Returns:\n", " Response from the API endpoint containing the list of collections,\n", " or prints error message if request fails.\n", " \"\"\"\n", - " url = f\"{INGESTOR_BASE_URL}/v1/collections\"\n", - " params = {\"vdb_endpoint\": \"http://milvus:19530\"}\n", - " async with aiohttp.ClientSession() as session:\n", - " try:\n", - " async with session.get(url, params=params) as response:\n", - " await print_raw_response(response)\n", - " except aiohttp.ClientError as e:\n", - " print(f\"Error: {e}\")\n", + " pc = Pinecone(api_key=os.getenv(\"PINECONE_API_KEY\"))\n", + " index = pc.Index(\"multimodal-data\")\n", + " namespaces = index.describe_index_stats().namespaces\n", + " print(namespaces)\n", "\n", - "await fetch_collections()" + "await fetch_namespaces()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "##### 3.2.2.3 Delete collections Endpoint\n", + "##### 3.2.2.3 Delete namespaces Endpoint\n", "\n", "**Purpose:**\n", - "This endpoint deletes list of provided collection names available on the specified vector database server.\n" + "This endpoint deletes list of provided namespaces on the specified vector database server.\n" ] }, { @@ -919,39 +923,36 @@ "source": [ "from typing import List\n", "\n", - "async def delete_collections(collection_names: List[str] = \"\") -> None:\n", - " \"\"\"Delete specified collections from the Milvus vector database.\n", + "async def delete_namespaces(namespaces: List[str] = \"\") -> None:\n", + " \"\"\"Delete specified namespaces from the Pinecone index.\n", " \n", " Makes a DELETE request to the ingestor API endpoint to remove the specified \n", - " collections from the Milvus server.\n", + " namespaces from the Pinecone index.\n", " \n", " Args:\n", - " collection_names (List[str]): List of collection names to delete. \n", + " namespaces (List[str]): List of namespaces to delete. \n", " Defaults to empty string.\n", " \n", " Returns:\n", " None. Prints response from API or error message if request fails.\n", " \n", " Example:\n", - " await delete_collections(collection_names=[\"collection1\", \"collection2\"])\n", + " await delete_namespaces(namespaces=[\"namespace1\", \"namespace2\"])\n", " \"\"\"\n", - " url = f\"{INGESTOR_BASE_URL}/v1/collections\"\n", - " params = {\"vdb_endpoint\": \"http://milvus:19530\"}\n", - " async with aiohttp.ClientSession() as session:\n", - " try:\n", - " async with session.delete(url, params=params, json=collection_names) as response:\n", - " await print_raw_response(response)\n", - " except aiohttp.ClientError as e:\n", - " print(f\"Error: {e}\")\n", + " pc = Pinecone(api_key=os.getenv(\"PINECONE_API_KEY\"))\n", + " index = pc.Index(\"multimodal-data\")\n", + " for namespace in namespaces:\n", + " index.delete(namespace, delete_all=True)\n", "\n", - "# Delete the collection from the previous section\n", - "print(\"\\nDeleting collection 'multimodal_data1'...\")\n", - "await delete_collections(collection_names=[\"multimodal_data1\"])\n", "\n", - "# Fetch collections \n", - "print(\"\\nFetching remaining collections:\")\n", + "# Delete the namespace from the previous section\n", + "print(\"\\nDeleting namespace 'multimodal_data1'...\")\n", + "await delete_namespaces(namespaces=[\"multimodal_data1\"])\n", + "\n", + "# Fetch namespaces \n", + "print(\"\\nFetching remaining namespaces:\")\n", "print(\"-\" * 30)\n", - "await fetch_collections()" + "await fetch_namespaces()" ] }, { @@ -964,25 +965,11 @@ "\n", "**Purpose:**\n", "This endpoint uploads new documents to the vector store. \n", - "1. You can specify the collection name where documents should be stored.\n", - "\n", - "2. The collection must exist in the vector database before uploading documents.\n", + "1. You can specify the namespace where documents should be stored.\n", "\n", - "3. Documents must not already exist in the collection. To update existing documents, use `session.patch(...)` instead of `session.post(...)`\n", + "2. Documents may exist already in the index. If they do the existing vector(s) will be overwritten.\n", "\n", - "4. Multiple files can be uploaded in a single request for efficiency\n", - "\n", - "**Configuration Options:**\n", - "\n", - "You can customize the document processing with these parameters:\n", - "\n", - "- `extraction_options`: Control what content is extracted (text, tables, charts)\n", - "\n", - "- `split_options`: Define how documents are chunked (size, overlap)\n", - "\n", - "- Custom metadata: Add additional information to your documents\n", - "\n", - "**We'll fetch the documents to verify ingestion, and then delete the document.**" + "3. Multiple files can be uploaded in a single request for efficiency\n" ] }, { @@ -994,9 +981,9 @@ "# Directory containing multimodal documents to upload\n", "DATA_DIR = \"data/multimodal\"\n", "\n", - "async def upload_documents(collection_name: str = \"\") -> None:\n", + "async def upload_documents(namespace: str = \"\") -> None:\n", " \"\"\"\n", - " Uploads documents from DATA_DIR to the specified collection in the vector store.\n", + " Uploads documents from DATA_DIR to the specified namespace in the vector store.\n", "\n", " This function:\n", " 1. Reads all files from DATA_DIR\n", @@ -1004,8 +991,8 @@ " 3. Uploads documents via POST request to the documents endpoint\n", " \n", " Args:\n", - " collection_name (str): Name of the collection to upload documents to.\n", - " Collection must exist before uploading.\n", + " namespace (str): Name of the namespace to upload documents to.\n", + " Namespace must exist before uploading.\n", " \n", " Extraction options:\n", " - Extracts text, tables and charts by default\n", @@ -1023,11 +1010,9 @@ " # Configure upload parameters\n", " # Configure document processing parameters\n", " data = {\n", - " # Milvus vector database endpoint\n", - " \"vdb_endpoint\": \"http://milvus:19530\",\n", - " \n", - " # Target collection name for document storage\n", - " \"collection_name\": collection_name,\n", + " # Target namespace for document storage\n", + " \"index_name\": \"nvidia-pinecone-rag\",\n", + " \"namespace\": namespace,\n", " \n", " # Document extraction configuration\n", " \"extraction_options\": {\n", @@ -1045,23 +1030,15 @@ " \"chunk_overlap\": 150 # Overlap between chunks in tokens\n", " }\n", " }\n", - "\n", - " # Prepare multipart form data with files and config\n", - " form_data = aiohttp.FormData()\n", - " for file_path in files:\n", - " form_data.add_field(\"documents\", open(file_path, \"rb\"), filename=os.path.basename(file_path), content_type=\"application/pdf\")\n", - " form_data.add_field(\"data\", json.dumps(data), content_type=\"application/json\")\n", - "\n", - " # Upload documents\n", " async with aiohttp.ClientSession() as session:\n", " try:\n", - " async with session.post(f\"{INGESTOR_BASE_URL}/v1/documents\", data=form_data) as response: # Replace with session.patch for reingesting\n", + " async with session.post(f\"{INGESTOR_BASE_URL}/v1/documents\", data=data) as response: # Replace with session.patch for reingesting\n", " await print_raw_response(response)\n", " except aiohttp.ClientError as e:\n", " print(f\"Error: {e}\")\n", "\n", "# Upload documents to the multimodal_data collection\n", - "await upload_documents(collection_name=\"multimodal_data\")" + "await upload_documents(namespace=\"multimodal_data\")" ] }, { @@ -1105,8 +1082,8 @@ "\n", "try:\n", " data = {\n", - " \"vdb_endpoint\": \"http://milvus:19530\",\n", - " \"collection_name\": \"multimodal_data\",\n", + " \"index_name\": \"nvidia-pinecone-rag\",\n", + " \"namespace\": \"multimodal_data\",\n", " \"extraction_options\": {\n", " \"extract_text\": True,\n", " \"extract_tables\": False,\n", @@ -1155,30 +1132,24 @@ "outputs": [], "source": [ "# Step 3: Fetch documents to verify ingestion\n", - "async def fetch_documents(collection_name: str = \"\"):\n", - " url = f\"{INGESTOR_BASE_URL}/v1/documents\"\n", - " params = {\"collection_name\": collection_name, \"vdb_endpoint\": \"http://milvus:19530\"}\n", - " async with aiohttp.ClientSession() as session:\n", - " try:\n", - " async with session.get(url, params=params) as response:\n", - " await print_raw_response(response)\n", - " except aiohttp.ClientError as e:\n", - " print(f\"Error: {e}\")\n", + "async def fetch_documents(documents: List[str], namespace: str = \"\"):\n", + " pc = Pinecone(api_key=os.getenv(\"PINECONE_API_KEY\"))\n", + " index = pc.Index(\"nvidia-pinecone-rag\")\n", + " stored_documents = index.fetch(ids=documents, namespace=namespace)\n", + " print(stored_documents)\n", "\n", - "await fetch_documents(collection_name=\"multimodal_data\")\n", + "await fetch_documents(documents=[tempfile.NamedTemporaryFile(suffix=\".txt\").name], namespace=\"multimodal_data\")\n", "\n", "# Step 4: Delete the test document \n", - "async def delete_documents(collection_name: str = \"\", file_names: List[str] = []):\n", - " url = f\"{INGESTOR_BASE_URL}/v1/documents\"\n", - " params = {\"collection_name\": collection_name, \"vdb_endpoint\": \"http://milvus:19530\"}\n", - " async with aiohttp.ClientSession() as session:\n", - " try:\n", - " async with session.delete(url, params=params, json=file_names) as response:\n", - " await print_raw_response(response)\n", - " except aiohttp.ClientError as e:\n", - " print(f\"Error: {e}\")\n", + "async def delete_documents(documents: List[str], namespace: str = \"\"):\n", + " pc = Pinecone(api_key=os.getenv(\"PINECONE_API_KEY\"))\n", + " index = pc.Index(\"nvidia-pinecone-rag\")\n", + " index.delete(ids=documents, namespace=namespace)\n", + "\n", + "await delete_documents(documents=[tempfile.NamedTemporaryFile(suffix=\".txt\").name], namespace=\"multimodal_data\")\n", "\n", - "await delete_documents(collection_name=\"multimodal_data\", file_names=[\"sample_document.txt\"])" + "# Step 5: Fetch the documents to verify deletion\n", + "await fetch_documents(documents=[tempfile.NamedTemporaryFile(suffix=\".txt\").name], namespace=\"multimodal_data\")" ] }, { @@ -1351,12 +1322,12 @@ "INGESTOR_BASE_URL = f\"http://{IPADDRESS}:{rag_server_port}\" # Replace with your server URL\n", "\n", "# Upload FIFA World Cup Winners PDF\n", - "async def upload_fifa_document(collection_name: str = \"\") -> None:\n", + "async def upload_fifa_document(namespace: str = \"\") -> None:\n", " fifa_pdf_path = \"fifa_world_cup_winners.pdf\"\n", " \n", " data = {\n", - " \"vdb_endpoint\": \"http://milvus:19530\",\n", - " \"collection_name\": collection_name,\n", + " \"index_name\": \"nvidia-pinecone-rag\",\n", + " \"namespace\": namespace,\n", " \"extraction_options\": {\n", " \"extract_text\": True,\n", " \"extract_tables\": True, \n", @@ -1385,7 +1356,7 @@ " except aiohttp.ClientError as e:\n", " print(f\"Error: {e}\")\n", "\n", - "await upload_fifa_document(collection_name=\"multimodal_data\")" + "await upload_fifa_document(namespace=\"test_knowledge_base\")" ] }, { @@ -1428,8 +1399,8 @@ " \"max_tokens\": 1024,\n", " \"reranker_top_k\": 10,\n", " \"vdb_top_k\": 100,\n", - " \"vdb_endpoint\": \"http://milvus:19530\",\n", - " \"collection_name\": \"multimodal_data\",\n", + " \"index_name\": \"nvidia-pinecone-rag\",\n", + " \"namespace\": \"test_knowledge_base\",\n", " \"enable_query_rewriting\": False,\n", " \"enable_reranker\": True,\n", " \"enable_guardrails\": False,\n", @@ -1497,8 +1468,8 @@ " \"max_tokens\": 1024,\n", " \"reranker_top_k\": 10,\n", " \"vdb_top_k\": 100,\n", - " \"vdb_endpoint\": \"http://milvus:19530\",\n", - " \"collection_name\": \"multimodal_data\",\n", + " \"index_name\": \"nvidia-pinecone-rag\",\n", + " \"namespace\": \"test_knowledge_base\",\n", " \"enable_query_rewriting\": False,\n", " \"enable_reranker\": True,\n", " \"enable_guardrails\": False,\n", @@ -1640,8 +1611,8 @@ " \"query\": \"\",\n", " \"reranker_top_k\": 3,\n", " \"vdb_top_k\": 100,\n", - " \"vdb_endpoint\": \"http://milvus:19530\",\n", - " \"collection_name\": \"multimodal_data\",\n", + " \"index_name\": \"nvidia-pinecone-rag\",\n", + " \"namespace\": \"test_knowledge_base\",\n", " \"messages\": [\n", " {\n", " \"role\": \"user\",\n", @@ -1699,8 +1670,8 @@ " \"query\": \"\",\n", " \"reranker_top_k\": 3,\n", " \"vdb_top_k\": 100,\n", - " \"vdb_endpoint\": \"http://milvus:19530\",\n", - " \"collection_name\": \"multimodal_data\",\n", + " \"index_name\": \"nvidia-pinecone-rag\",\n", + " \"namespace\": \"test_knowledge_base\",\n", " \"messages\": [\n", " {\n", " \"role\": \"user\",\n", @@ -1776,7 +1747,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": ".venv", "language": "python", "name": "python3" }, @@ -1790,7 +1761,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.13.5" } }, "nbformat": 4, diff --git a/requirements.txt b/requirements.txt index aed9b1e..eb7e4b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,10 @@ bleach==6.1.0 dataclass-wizard==0.27.0 fastapi==0.115.12 -httpx==0.27.2 +httpx>=0.27.2 minio==7.2.15 -nv-ingest-client==25.3.0 +#nv-ingest-client==25.3.0 +git+https://github.com/pinecone-io/nv-ingest.git@add-pinecone-vdb#egg=nv-ingest-client&subdirectory=client opentelemetry-api opentelemetry-exporter-otlp-proto-grpc opentelemetry-exporter-otlp-proto-http diff --git a/src/ingestor_server/main.py b/src/ingestor_server/main.py index 242dbe8..643fa7f 100644 --- a/src/ingestor_server/main.py +++ b/src/ingestor_server/main.py @@ -165,7 +165,7 @@ def delete_collections( @staticmethod - def get_collections(vdb_endpoint: str) -> Dict[str, Any]: + def get_collections(vdb_endpoint: str, **kwargs) -> Dict[str, Any]: """ Get index statistics from Pinecone. @@ -177,7 +177,8 @@ def get_collections(vdb_endpoint: str) -> Dict[str, Any]: """ try: pc = get_pinecone_client() - index_name = os.getenv("PINECONE_INDEX_NAME", "rag-index") + index_name = kwargs.get("index_name") or os.getenv("PINECONE_INDEX_NAME", "rag-index") + namespace = kwargs.get("namespace") or os.getenv("PINECONE_NAMESPACE", "default") if index_name not in list_indexes(pc): return { @@ -370,7 +371,8 @@ def _prepare_documents(self, results: List[List[Dict[str, Union[str, dict]]]]) - def _add_documents_to_vectorstore( self, documents: List[Document], - collection_name: str + index_name: str, + namespace: str ) -> None: """ Add documents to Pinecone index @@ -383,7 +385,8 @@ def _add_documents_to_vectorstore( pc = get_pinecone_client() add_documents( pc=pc, - index_name=collection_name, + index_name=index_name, + namespace=namespace, documents=documents, embedder=DOCUMENT_EMBEDDER, batch_size=self._vdb_upload_bulk_size diff --git a/src/rag_server/requirements.txt b/src/rag_server/requirements.txt index 28581f0..f5c0ef2 100644 --- a/src/rag_server/requirements.txt +++ b/src/rag_server/requirements.txt @@ -1,9 +1,10 @@ bleach==6.1.0 dataclass-wizard==0.27.0 fastapi==0.115.12 -httpx==0.27.2 +httpx>=0.27.2 minio==7.2.15 -nv-ingest-client==25.3.0 +#nv-ingest-client==25.3.0 +git+https://github.com/pinecone-io/nv-ingest.git@add-pinecone-vdb#egg=nv-ingest-client&subdirectory=client opentelemetry-api opentelemetry-exporter-otlp-proto-grpc opentelemetry-exporter-otlp-proto-http diff --git a/src/rag_server/server.py b/src/rag_server/server.py index 10c5700..dbb785f 100644 --- a/src/rag_server/server.py +++ b/src/rag_server/server.py @@ -552,13 +552,13 @@ class HealthResponse(BaseModel): @app.on_event("startup") -async def startup_event(): +async def startup_event(**kwargs): """Initialize Pinecone on startup""" pc = get_pinecone_client() app.state.pinecone = pc # Create index if it doesn't exist - index_name = os.getenv("PINECONE_INDEX_NAME", "rag-index") + index_name = kwargs.get("index_name") or os.getenv("PINECONE_INDEX_NAME", "rag-index") dimension = int(os.getenv("PINECONE_DIMENSION", "1536")) metric = os.getenv("PINECONE_METRIC", "cosine") cloud = os.getenv("PINECONE_CLOUD", "aws") diff --git a/src/utils/pinecone_utils.py b/src/utils/pinecone_utils.py index eeab787..0ca2e81 100644 --- a/src/utils/pinecone_utils.py +++ b/src/utils/pinecone_utils.py @@ -57,12 +57,12 @@ def get_pinecone_client() -> Pinecone: logger.error(f"Unexpected error connecting to Pinecone: {str(e)}") raise PineconeConnectionError(f"Unexpected error: {str(e)}") -def get_index(pc: Optional[Pinecone] = None): +def get_index(pc: Optional[Pinecone] = None, **kwargs): """Get Pinecone index from client""" try: if pc is None: pc = get_pinecone_client() - index_name = os.getenv("PINECONE_INDEX_NAME", "rag-index") + index_name = kwargs.get("index_name") or os.getenv("PINECONE_INDEX_NAME", "rag-index") if not index_name in pc.list_indexes(): create_index(pc, index_name, @@ -89,7 +89,7 @@ def list_indexes(pc: Pinecone) -> List[str]: except Exception as e: logger.error(f"Unexpected error listing Pinecone indexes: {str(e)}") -def create_index(pc: Pinecone, index_name: str, dimension: int, metric: str = "cosine", serverless: bool = True, cloud: str = "aws", region: str = "us-east-1"): +def create_index(pc: Pinecone, index_name: str, dimension: int, metric: str = "cosine", serverless: bool = True, cloud: str = "aws", region: str = "us-east-1", **kwargs): """Create Pinecone index if it doesn't exist""" pc.create_index( index_name, @@ -114,9 +114,10 @@ def delete_index(pc: Pinecone, index_name: str) -> None: logger.error(f"Unexpected error deleting Pinecone index: {str(e)}") raise PineconeConnectionError(f"Unexpected error: {str(e)}") -def add_documents(pc: Pinecone, index_name: str, documents: List[Document], embedder: Any, batch_size: int = 500) -> None: +def add_documents(pc: Pinecone, index_name: str, documents: List[Document], embedder: Any, batch_size: int = 500, **kwargs) -> None: """Add documents to Pinecone index with embeddings""" index = pc.Index(index_name) + namespace = kwargs.get("namespace") or os.getenv("PINECONE_NAMESPACE", "default") for i in range(0, len(documents), batch_size): batch = documents[i:i+batch_size] @@ -142,7 +143,7 @@ def add_documents(pc: Pinecone, index_name: str, documents: List[Document], embe ] # Upsert to Pinecone - index.upsert(vectors=vectors) + index.upsert(vectors=vectors, namespace=namespace) def get_index_stats(pc: Pinecone, index_name: str) -> Dict[str, Any]: """Get index statistics from Pinecone"""