Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ TAVILY_API_KEY=
LANGSMITH_API_KEY=
LANGSMITH_PROJECT=
LANGSMITH_TRACING=
SERPER_API_KEY=

# Only necessary for Open Agent Platform
SUPABASE_KEY=
Expand Down
2 changes: 2 additions & 0 deletions src/open_deep_research/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class SearchAPI(Enum):
ANTHROPIC = "anthropic"
OPENAI = "openai"
TAVILY = "tavily"
SERPER = "serper"
NONE = "none"

class MCPConfig(BaseModel):
Expand Down Expand Up @@ -86,6 +87,7 @@ class Configuration(BaseModel):
{"label": "Tavily", "value": SearchAPI.TAVILY.value},
{"label": "OpenAI Native Web Search", "value": SearchAPI.OPENAI.value},
{"label": "Anthropic Native Web Search", "value": SearchAPI.ANTHROPIC.value},
{"label": "Serper", "value": SearchAPI.SERPER.value},
{"label": "None", "value": SearchAPI.NONE.value}
]
}
Expand Down
118 changes: 117 additions & 1 deletion src/open_deep_research/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,109 @@
from open_deep_research.prompts import summarize_webpage_prompt
from open_deep_research.state import ResearchComplete, Summary

##########################
# Serper Search Tool Utils
##########################
SERPER_SEARCH_DESCRIPTION = (
"A search engine that uses Google's results via the Serper API. "
"Useful for when you need to answer questions about current events."
)

@tool(description=SERPER_SEARCH_DESCRIPTION)
async def serper_search(
queries: List[str],
max_results: Annotated[int, InjectedToolArg] = 5,
config: RunnableConfig = None,
) -> str:
"""Fetches results from the Serper search API."""
search_results = await serper_search_async(
queries,
max_results=max_results,
config=config,
)

formatted_output = f"Search results: \n\n"
unique_results: Dict[str, Dict[str, Any]] = {}
for response in search_results:
for result in response["results"]:
url = result["url"]
if url not in unique_results:
unique_results[url] = {**result, "query": response["query"]}

configurable = Configuration.from_runnable_config(config)
max_char_to_include = 50_000
model_api_key = get_api_key_for_model(configurable.summarization_model, config)
summarization_model = init_chat_model(
model=configurable.summarization_model,
max_tokens=configurable.summarization_model_max_tokens,
api_key=model_api_key,
tags=["langsmith:nostream"],
).with_structured_output(Summary).with_retry(
stop_after_attempt=configurable.max_structured_output_retries
)

async def noop():
return None

summarization_tasks = [
noop() if not result.get("content") else summarize_webpage(
summarization_model, result["content"][:max_char_to_include]
)
for result in unique_results.values()
]
summaries = await asyncio.gather(*summarization_tasks)
summarized_results = {
url: {
"title": result["title"],
"content": result["content"] if summary is None else summary,
}
for url, result, summary in zip(
unique_results.keys(), unique_results.values(), summaries
)
}

for i, (url, result) in enumerate(summarized_results.items()):
formatted_output += f"\n\n--- SOURCE {i+1}: {result['title']} ---\n"
formatted_output += f"URL: {url}\n\n"
formatted_output += f"SUMMARY:\n{result['content']}\n\n"
formatted_output += "\n\n" + "-" * 80 + "\n"

if summarized_results:
return formatted_output
else:
return "No valid search results found. Please try different search queries or use a different search API."


async def serper_search_async(
search_queries, max_results: int = 5, config: RunnableConfig = None
):
api_key = get_serper_api_key(config)
headers = {
"X-API-KEY": api_key,
"Content-Type": "application/json",
}
url = "https://google.serper.dev/search"

async with aiohttp.ClientSession(headers=headers) as session:
async def fetch(query: str):
payload = {"q": query, "gl": "in", "hl": "en", "num": max_results}
async with session.post(url, json=payload) as response:
data = await response.json()
organic = data.get("organic", [])
formatted_results = [
{
"title": r.get("title"),
"url": r.get("link"),
"content": r.get("snippet"),
}
for r in organic
]
return {"query": query, "results": formatted_results}

tasks = [fetch(query) for query in search_queries]
return await asyncio.gather(*tasks)


##########################
# Tavily Search Tool Utils
##########################
Expand Down Expand Up @@ -558,7 +661,10 @@ async def get_search_tool(search_api: SearchAPI):
"name": "web_search"
}
return [search_tool]

elif search_api == SearchAPI.SERPER:
search_tool = serper_search
search_tool.metadata = {**(search_tool.metadata or {}), "type": "search", "name": "web_search"}
return [search_tool]
elif search_api == SearchAPI.NONE:
# No search functionality configured
return []
Expand Down Expand Up @@ -923,3 +1029,13 @@ def get_tavily_api_key(config: RunnableConfig):
return api_keys.get("TAVILY_API_KEY")
else:
return os.getenv("TAVILY_API_KEY")

def get_serper_api_key(config: RunnableConfig):
should_get_from_config = os.getenv("GET_API_KEYS_FROM_CONFIG", "false")
if should_get_from_config.lower() == "true":
api_keys = config.get("configurable", {}).get("apiKeys", {})
if not api_keys:
return None
return api_keys.get("SERPER_API_KEY")
else:
return os.getenv("SERPER_API_KEY")