Add comprehensive set of free data tools for RAG

Tools added:
- Wikipedia: search, get article, get full article
- News: Hacker News, Reddit, aggregated news search
- Finance: stocks (yfinance), crypto (CoinGecko), exchange rates
- Medical: PubMed, FDA, disease data, health topics
- Weather: current, forecast, air quality (Open-Meteo)
- Science: arXiv, Semantic Scholar, DOAJ
- Web: DuckDuckGo search, instant answers, page content

All tools use completely free APIs with no authentication required.
This commit is contained in:
Z User 2026-03-29 06:27:32 +00:00
parent e0f8408a7c
commit 4394e7d6f9
9 changed files with 3329 additions and 15 deletions

View File

@ -10,7 +10,7 @@ aiohttp~=3.11.0
httpx~=0.28.0
requests~=2.32.4
# Web scraping (for website downloader)
# Web scraping and parsing
beautifulsoup4~=4.13.4
lxml~=5.3.0
urllib3~=2.5.0
@ -20,7 +20,10 @@ PyMuPDF~=1.25.0
python-docx~=1.1.0
# LLM API client (for OpenRouter)
openai~=1.0.0
openai>=1.30.0
# Financial data
yfinance>=0.2.0
# Vector store alternatives (uncomment as needed)
# chromadb~=0.5.0

View File

@ -2,6 +2,7 @@
Tools Module - Tool management for the RAG system
Provides a unified interface for tool registration and execution.
All tools use completely free APIs with no authentication required.
"""
from __future__ import annotations
@ -10,12 +11,6 @@ import json
import logging
from typing import Any, Callable, Optional
# Import the website downloader tool
from website_downloader_tool import (
website_downloader,
get_tool_schema as get_website_downloader_schema,
)
log = logging.getLogger(__name__)
@ -37,13 +32,294 @@ class ToolManager:
self._register_builtin_tools()
def _register_builtin_tools(self) -> None:
"""Register built-in tools."""
# Register website downloader
self.register_tool(
name="website_downloader",
function=website_downloader,
schema=get_website_downloader_schema(),
)
"""Register all built-in tools."""
# === Website Downloader Tool ===
try:
from website_downloader_tool import (
website_downloader,
get_tool_schema as get_website_downloader_schema,
)
self.register_tool(
name="website_downloader",
function=website_downloader,
schema=get_website_downloader_schema(),
)
except ImportError as e:
log.warning(f"Could not import website_downloader_tool: {e}")
# === Wikipedia Tools ===
try:
from tools.wikipedia_tool import (
wikipedia_search,
wikipedia_get_article,
wikipedia_get_full_article,
WIKIPEDIA_SEARCH_SCHEMA,
WIKIPEDIA_GET_ARTICLE_SCHEMA,
WIKIPEDIA_GET_FULL_ARTICLE_SCHEMA,
)
self.register_tool(
name="wikipedia_search",
function=wikipedia_search,
schema=WIKIPEDIA_SEARCH_SCHEMA,
)
self.register_tool(
name="wikipedia_get_article",
function=wikipedia_get_article,
schema=WIKIPEDIA_GET_ARTICLE_SCHEMA,
)
self.register_tool(
name="wikipedia_get_full_article",
function=wikipedia_get_full_article,
schema=WIKIPEDIA_GET_FULL_ARTICLE_SCHEMA,
)
except ImportError as e:
log.warning(f"Could not import wikipedia_tool: {e}")
# === News Tools ===
try:
from tools.news_tool import (
news_search_hackernews,
news_get_top_stories,
news_get_reddit,
news_search_reddit,
news_aggregate,
NEWS_SEARCH_HACKERNEWS_SCHEMA,
NEWS_GET_TOP_STORIES_SCHEMA,
NEWS_GET_REDDIT_SCHEMA,
NEWS_SEARCH_REDDIT_SCHEMA,
NEWS_AGGREGATE_SCHEMA,
)
self.register_tool(
name="news_search_hackernews",
function=news_search_hackernews,
schema=NEWS_SEARCH_HACKERNEWS_SCHEMA,
)
self.register_tool(
name="news_get_top_stories",
function=news_get_top_stories,
schema=NEWS_GET_TOP_STORIES_SCHEMA,
)
self.register_tool(
name="news_get_reddit",
function=news_get_reddit,
schema=NEWS_GET_REDDIT_SCHEMA,
)
self.register_tool(
name="news_search_reddit",
function=news_search_reddit,
schema=NEWS_SEARCH_REDDIT_SCHEMA,
)
self.register_tool(
name="news_aggregate",
function=news_aggregate,
schema=NEWS_AGGREGATE_SCHEMA,
)
except ImportError as e:
log.warning(f"Could not import news_tool: {e}")
# === Finance Tools ===
try:
from tools.finance_tool import (
finance_get_stock_info,
finance_get_stock_history,
finance_get_crypto_price,
finance_get_top_cryptos,
finance_get_exchange_rate,
finance_search_crypto,
FINANCE_GET_STOCK_INFO_SCHEMA,
FINANCE_GET_STOCK_HISTORY_SCHEMA,
FINANCE_GET_CRYPTO_PRICE_SCHEMA,
FINANCE_GET_TOP_CRYPTOS_SCHEMA,
FINANCE_GET_EXCHANGE_RATE_SCHEMA,
FINANCE_SEARCH_CRYPTO_SCHEMA,
)
self.register_tool(
name="finance_get_stock_info",
function=finance_get_stock_info,
schema=FINANCE_GET_STOCK_INFO_SCHEMA,
)
self.register_tool(
name="finance_get_stock_history",
function=finance_get_stock_history,
schema=FINANCE_GET_STOCK_HISTORY_SCHEMA,
)
self.register_tool(
name="finance_get_crypto_price",
function=finance_get_crypto_price,
schema=FINANCE_GET_CRYPTO_PRICE_SCHEMA,
)
self.register_tool(
name="finance_get_top_cryptos",
function=finance_get_top_cryptos,
schema=FINANCE_GET_TOP_CRYPTOS_SCHEMA,
)
self.register_tool(
name="finance_get_exchange_rate",
function=finance_get_exchange_rate,
schema=FINANCE_GET_EXCHANGE_RATE_SCHEMA,
)
self.register_tool(
name="finance_search_crypto",
function=finance_search_crypto,
schema=FINANCE_SEARCH_CRYPTO_SCHEMA,
)
except ImportError as e:
log.warning(f"Could not import finance_tool: {e}")
# === Medical Tools ===
try:
from tools.medical_tool import (
medical_search_pubmed,
medical_get_pubmed_abstract,
medical_get_disease_data,
medical_get_covid_country,
medical_search_fda,
medical_get_health_topics,
MEDICAL_SEARCH_PUBMED_SCHEMA,
MEDICAL_GET_PUBMED_ABSTRACT_SCHEMA,
MEDICAL_GET_DISEASE_DATA_SCHEMA,
MEDICAL_GET_COVID_COUNTRY_SCHEMA,
MEDICAL_SEARCH_FDA_SCHEMA,
MEDICAL_GET_HEALTH_TOPICS_SCHEMA,
)
self.register_tool(
name="medical_search_pubmed",
function=medical_search_pubmed,
schema=MEDICAL_SEARCH_PUBMED_SCHEMA,
)
self.register_tool(
name="medical_get_pubmed_abstract",
function=medical_get_pubmed_abstract,
schema=MEDICAL_GET_PUBMED_ABSTRACT_SCHEMA,
)
self.register_tool(
name="medical_get_disease_data",
function=medical_get_disease_data,
schema=MEDICAL_GET_DISEASE_DATA_SCHEMA,
)
self.register_tool(
name="medical_get_covid_country",
function=medical_get_covid_country,
schema=MEDICAL_GET_COVID_COUNTRY_SCHEMA,
)
self.register_tool(
name="medical_search_fda",
function=medical_search_fda,
schema=MEDICAL_SEARCH_FDA_SCHEMA,
)
self.register_tool(
name="medical_get_health_topics",
function=medical_get_health_topics,
schema=MEDICAL_GET_HEALTH_TOPICS_SCHEMA,
)
except ImportError as e:
log.warning(f"Could not import medical_tool: {e}")
# === Weather Tools ===
try:
from tools.weather_tool import (
weather_get_current,
weather_get_forecast,
weather_get_air_quality,
WEATHER_GET_CURRENT_SCHEMA,
WEATHER_GET_FORECAST_SCHEMA,
WEATHER_GET_AIR_QUALITY_SCHEMA,
)
self.register_tool(
name="weather_get_current",
function=weather_get_current,
schema=WEATHER_GET_CURRENT_SCHEMA,
)
self.register_tool(
name="weather_get_forecast",
function=weather_get_forecast,
schema=WEATHER_GET_FORECAST_SCHEMA,
)
self.register_tool(
name="weather_get_air_quality",
function=weather_get_air_quality,
schema=WEATHER_GET_AIR_QUALITY_SCHEMA,
)
except ImportError as e:
log.warning(f"Could not import weather_tool: {e}")
# === Science Tools ===
try:
from tools.science_tool import (
science_search_arxiv,
science_search_semantic_scholar,
science_get_paper_details,
science_search_doaj,
science_aggregate_search,
SCIENCE_SEARCH_ARXIV_SCHEMA,
SCIENCE_SEARCH_SEMANTIC_SCHOLAR_SCHEMA,
SCIENCE_GET_PAPER_DETAILS_SCHEMA,
SCIENCE_SEARCH_DOAJ_SCHEMA,
SCIENCE_AGGREGATE_SEARCH_SCHEMA,
)
self.register_tool(
name="science_search_arxiv",
function=science_search_arxiv,
schema=SCIENCE_SEARCH_ARXIV_SCHEMA,
)
self.register_tool(
name="science_search_semantic_scholar",
function=science_search_semantic_scholar,
schema=SCIENCE_SEARCH_SEMANTIC_SCHOLAR_SCHEMA,
)
self.register_tool(
name="science_get_paper_details",
function=science_get_paper_details,
schema=SCIENCE_GET_PAPER_DETAILS_SCHEMA,
)
self.register_tool(
name="science_search_doaj",
function=science_search_doaj,
schema=SCIENCE_SEARCH_DOAJ_SCHEMA,
)
self.register_tool(
name="science_aggregate_search",
function=science_aggregate_search,
schema=SCIENCE_AGGREGATE_SEARCH_SCHEMA,
)
except ImportError as e:
log.warning(f"Could not import science_tool: {e}")
# === Web Search Tools ===
try:
from tools.web_tool import (
web_search,
web_instant_answer,
web_get_page_content,
web_search_and_fetch,
WEB_SEARCH_SCHEMA,
WEB_INSTANT_ANSWER_SCHEMA,
WEB_GET_PAGE_CONTENT_SCHEMA,
WEB_SEARCH_AND_FETCH_SCHEMA,
)
self.register_tool(
name="web_search",
function=web_search,
schema=WEB_SEARCH_SCHEMA,
)
self.register_tool(
name="web_instant_answer",
function=web_instant_answer,
schema=WEB_INSTANT_ANSWER_SCHEMA,
)
self.register_tool(
name="web_get_page_content",
function=web_get_page_content,
schema=WEB_GET_PAGE_CONTENT_SCHEMA,
)
self.register_tool(
name="web_search_and_fetch",
function=web_search_and_fetch,
schema=WEB_SEARCH_AND_FETCH_SCHEMA,
)
except ImportError as e:
log.warning(f"Could not import web_tool: {e}")
log.info(f"Registered {len(self._tools)} built-in tools")

523
tools/finance_tool.py Normal file
View File

@ -0,0 +1,523 @@
"""
Financial Data Tool - Get stock quotes, crypto prices, and financial data
Free sources used:
- Yahoo Finance (yfinance library - completely free)
- CoinGecko API (free tier: 10-50 calls/minute)
- FRED API (Federal Reserve Economic Data - free with API key)
- ExchangeRate-API (free tier)
Most functions work without API keys.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Optional
import requests
log = logging.getLogger(__name__)
# Free API endpoints
COINGECKO_API = "https://api.coingecko.com/api/v3"
EXCHANGE_RATE_API = "https://api.exchangerate-api.com/v4/latest"
FRED_API = "https://api.stlouisfed.org/fred"
def finance_get_stock_info(
symbol: str,
) -> dict:
"""
Get stock information from Yahoo Finance.
Args:
symbol: Stock ticker symbol (e.g., AAPL, GOOGL, TSLA)
Returns:
Dictionary with stock information
"""
try:
import yfinance as yf
ticker = yf.Ticker(symbol.upper())
info = ticker.info
# Extract key financial data
result = {
"success": True,
"source": "yahoo_finance",
"symbol": symbol.upper(),
"company_name": info.get("longName", info.get("shortName", "")),
"current_price": info.get("currentPrice") or info.get("regularMarketPrice"),
"previous_close": info.get("previousClose"),
"open": info.get("open"),
"day_high": info.get("dayHigh"),
"day_low": info.get("dayLow"),
"52_week_high": info.get("fiftyTwoWeekHigh"),
"52_week_low": info.get("fiftyTwoWeekLow"),
"market_cap": info.get("marketCap"),
"pe_ratio": info.get("trailingPE"),
"forward_pe": info.get("forwardPE"),
"dividend_yield": info.get("dividendYield"),
"volume": info.get("volume"),
"avg_volume": info.get("averageVolume"),
"beta": info.get("beta"),
"eps": info.get("trailingEps"),
"revenue": info.get("totalRevenue"),
"profit_margins": info.get("profitMargins"),
"description": info.get("longBusinessSummary", "")[:1000],
"sector": info.get("sector"),
"industry": info.get("industry"),
"website": info.get("website"),
"timestamp": datetime.now().isoformat(),
}
# Remove None values
result = {k: v for k, v in result.items() if v is not None}
return result
except ImportError:
return {
"success": False,
"error": "yfinance not installed. Run: pip install yfinance",
"source": "yahoo_finance",
}
except Exception as e:
log.error(f"Stock info fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "yahoo_finance",
"symbol": symbol,
}
def finance_get_stock_history(
symbol: str,
period: str = "1mo",
interval: str = "1d",
) -> dict:
"""
Get historical stock prices from Yahoo Finance.
Args:
symbol: Stock ticker symbol
period: Time period (1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max)
interval: Data interval (1m, 2m, 5m, 15m, 30m, 60m, 90m, 1h, 1d, 5d, 1wk, 1mo, 3mo)
Returns:
Dictionary with historical price data
"""
try:
import yfinance as yf
ticker = yf.Ticker(symbol.upper())
hist = ticker.history(period=period, interval=interval)
if hist.empty:
return {
"success": False,
"error": f"No historical data found for {symbol}",
"source": "yahoo_finance",
}
# Convert to list of dicts
prices = []
for index, row in hist.iterrows():
prices.append({
"date": index.isoformat(),
"open": round(row["Open"], 2),
"high": round(row["High"], 2),
"low": round(row["Low"], 2),
"close": round(row["Close"], 2),
"volume": int(row["Volume"]),
})
return {
"success": True,
"source": "yahoo_finance",
"symbol": symbol.upper(),
"period": period,
"interval": interval,
"prices": prices,
"count": len(prices),
}
except ImportError:
return {
"success": False,
"error": "yfinance not installed. Run: pip install yfinance",
"source": "yahoo_finance",
}
except Exception as e:
log.error(f"Stock history fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "yahoo_finance",
}
def finance_get_crypto_price(
coin_id: str = "bitcoin",
vs_currency: str = "usd",
) -> dict:
"""
Get cryptocurrency price from CoinGecko.
Args:
coin_id: Coin ID (e.g., bitcoin, ethereum, dogecoin) - use coin name from CoinGecko
vs_currency: Currency to show price in (e.g., usd, eur, btc)
Returns:
Dictionary with cryptocurrency data
"""
try:
url = f"{COINGECKO_API}/simple/price"
params = {
"ids": coin_id.lower(),
"vs_currencies": vs_currency.lower(),
"include_market_cap": "true",
"include_24hr_vol": "true",
"include_24hr_change": "true",
"include_last_updated_at": "true",
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if coin_id.lower() not in data:
return {
"success": False,
"error": f"Coin not found: {coin_id}. Try using the full coin name (e.g., 'bitcoin' not 'btc')",
"source": "coingecko",
}
coin_data = data[coin_id.lower()]
return {
"success": True,
"source": "coingecko",
"coin_id": coin_id.lower(),
"currency": vs_currency.lower(),
"price": coin_data.get(vs_currency.lower()),
"market_cap": coin_data.get(f"{vs_currency.lower()}_market_cap"),
"24h_volume": coin_data.get(f"{vs_currency.lower()}_24h_vol"),
"24h_change": coin_data.get(f"{vs_currency.lower()}_24h_change"),
"last_updated": datetime.fromtimestamp(
coin_data.get("last_updated_at", 0)
).isoformat() if coin_data.get("last_updated_at") else None,
}
except Exception as e:
log.error(f"Crypto price fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "coingecko",
}
def finance_get_top_cryptos(
limit: int = 10,
vs_currency: str = "usd",
) -> dict:
"""
Get top cryptocurrencies by market cap from CoinGecko.
Args:
limit: Number of coins to return (default: 10)
vs_currency: Currency for prices (default: usd)
Returns:
Dictionary with top cryptocurrencies
"""
try:
url = f"{COINGECKO_API}/coins/markets"
params = {
"vs_currency": vs_currency.lower(),
"order": "market_cap_desc",
"per_page": limit,
"page": 1,
"sparkline": "false",
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
results = []
for coin in data:
results.append({
"id": coin.get("id"),
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name"),
"price": coin.get("current_price"),
"market_cap": coin.get("market_cap"),
"market_cap_rank": coin.get("market_cap_rank"),
"24h_change": coin.get("price_change_percentage_24h"),
"volume": coin.get("total_volume"),
"circulating_supply": coin.get("circulating_supply"),
"image": coin.get("image"),
})
return {
"success": True,
"source": "coingecko",
"currency": vs_currency.lower(),
"results": results,
"count": len(results),
}
except Exception as e:
log.error(f"Top cryptos fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "coingecko",
}
def finance_get_exchange_rate(
base_currency: str = "USD",
target_currency: Optional[str] = None,
) -> dict:
"""
Get exchange rates from ExchangeRate-API (free).
Args:
base_currency: Base currency code (default: USD)
target_currency: Target currency code (optional, returns all if not specified)
Returns:
Dictionary with exchange rate(s)
"""
try:
url = f"https://api.exchangerate-api.com/v4/latest/{base_currency.upper()}"
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
rates = data.get("rates", {})
if target_currency:
target_currency = target_currency.upper()
if target_currency in rates:
return {
"success": True,
"source": "exchangerate-api",
"base": base_currency.upper(),
"target": target_currency,
"rate": rates[target_currency],
"last_updated": data.get("date"),
}
else:
return {
"success": False,
"error": f"Currency not found: {target_currency}",
"source": "exchangerate-api",
}
return {
"success": True,
"source": "exchangerate-api",
"base": base_currency.upper(),
"rates": rates,
"count": len(rates),
"last_updated": data.get("date"),
}
except Exception as e:
log.error(f"Exchange rate fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "exchangerate-api",
}
def finance_search_crypto(
query: str,
) -> dict:
"""
Search for cryptocurrencies on CoinGecko.
Args:
query: Search query (coin name or symbol)
Returns:
Dictionary with search results
"""
try:
url = f"{COINGECKO_API}/search"
params = {"query": query}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
coins = data.get("coins", [])[:10]
results = []
for coin in coins:
results.append({
"id": coin.get("id"),
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name"),
"market_cap_rank": coin.get("market_cap_rank"),
"thumb": coin.get("thumb"),
})
return {
"success": True,
"source": "coingecko",
"query": query,
"results": results,
"count": len(results),
}
except Exception as e:
log.error(f"Crypto search failed: {e}")
return {
"success": False,
"error": str(e),
"source": "coingecko",
}
# Tool schemas for OpenAI function calling
FINANCE_GET_STOCK_INFO_SCHEMA = {
"type": "function",
"function": {
"name": "finance_get_stock_info",
"description": "Get current stock information and key financial metrics from Yahoo Finance. Use for stock quotes and company data.",
"parameters": {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Stock ticker symbol (e.g., AAPL, GOOGL, TSLA, MSFT)",
},
},
"required": ["symbol"],
},
},
}
FINANCE_GET_STOCK_HISTORY_SCHEMA = {
"type": "function",
"function": {
"name": "finance_get_stock_history",
"description": "Get historical stock prices from Yahoo Finance. Use for price trends and charts.",
"parameters": {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Stock ticker symbol",
},
"period": {
"type": "string",
"description": "Time period (1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, max)",
"default": "1mo",
},
"interval": {
"type": "string",
"description": "Data interval (1m, 5m, 15m, 1h, 1d, 1wk, 1mo)",
"default": "1d",
},
},
"required": ["symbol"],
},
},
}
FINANCE_GET_CRYPTO_PRICE_SCHEMA = {
"type": "function",
"function": {
"name": "finance_get_crypto_price",
"description": "Get cryptocurrency price and market data from CoinGecko. Use the full coin name (e.g., 'bitcoin' not 'btc').",
"parameters": {
"type": "object",
"properties": {
"coin_id": {
"type": "string",
"description": "CoinGecko coin ID (e.g., bitcoin, ethereum, dogecoin, solana)",
},
"vs_currency": {
"type": "string",
"description": "Currency for price (default: usd)",
"default": "usd",
},
},
"required": ["coin_id"],
},
},
}
FINANCE_GET_TOP_CRYPTOS_SCHEMA = {
"type": "function",
"function": {
"name": "finance_get_top_cryptos",
"description": "Get top cryptocurrencies by market capitalization from CoinGecko.",
"parameters": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of coins to return (default: 10)",
"default": 10,
},
"vs_currency": {
"type": "string",
"description": "Currency for prices (default: usd)",
"default": "usd",
},
},
"required": [],
},
},
}
FINANCE_GET_EXCHANGE_RATE_SCHEMA = {
"type": "function",
"function": {
"name": "finance_get_exchange_rate",
"description": "Get currency exchange rates. Returns all rates for base currency or specific rate if target provided.",
"parameters": {
"type": "object",
"properties": {
"base_currency": {
"type": "string",
"description": "Base currency code (default: USD)",
"default": "USD",
},
"target_currency": {
"type": "string",
"description": "Target currency code (optional, returns all if not specified)",
},
},
"required": [],
},
},
}
FINANCE_SEARCH_CRYPTO_SCHEMA = {
"type": "function",
"function": {
"name": "finance_search_crypto",
"description": "Search for cryptocurrencies on CoinGecko by name or symbol. Use this to find the correct coin_id for finance_get_crypto_price.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (coin name or symbol)",
},
},
"required": ["query"],
},
},
}

508
tools/medical_tool.py Normal file
View File

@ -0,0 +1,508 @@
"""
Medical/Health Tool - Search medical literature and health data
Free sources used:
- PubMed/NCBI E-utilities API (completely free, no key required for basic use)
- Disease.sh API (completely free, open disease data)
- Health.gov API (free government health data)
- OpenFDA API (free FDA data)
All APIs are free and most don't require authentication.
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Optional
import requests
log = logging.getLogger(__name__)
# Free medical API endpoints
PUBMED_EUTILS_API = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
DISEASE_API = "https://disease.sh/v3"
OPENFDA_API = "https://api.fda.gov"
HEALTH_GOV_API = "https://health.gov"
def medical_search_pubmed(
query: str,
max_results: int = 10,
) -> dict:
"""
Search PubMed for medical/health research articles.
Args:
query: Search query (medical terms, diseases, treatments, etc.)
max_results: Maximum number of results (default: 10)
Returns:
Dictionary with PubMed search results
"""
try:
# First, search for article IDs
search_url = f"{PUBMED_EUTILS_API}/esearch.fcgi"
search_params = {
"db": "pubmed",
"term": query,
"retmax": max_results,
"retmode": "json",
"sort": "relevance",
}
search_response = requests.get(search_url, params=search_params, timeout=15)
search_response.raise_for_status()
search_data = search_response.json()
id_list = search_data.get("esearchresult", {}).get("idlist", [])
if not id_list:
return {
"success": True,
"source": "pubmed",
"query": query,
"results": [],
"count": 0,
"message": "No articles found for this query",
}
# Fetch article summaries
fetch_url = f"{PUBMED_EUTILS_API}/esummary.fcgi"
fetch_params = {
"db": "pubmed",
"id": ",".join(id_list),
"retmode": "json",
}
fetch_response = requests.get(fetch_url, params=fetch_params, timeout=15)
fetch_response.raise_for_status()
fetch_data = fetch_response.json()
results = []
for article_id in id_list:
article = fetch_data.get("result", {}).get(article_id, {})
if article and "error" not in article:
results.append({
"pmid": article_id,
"title": article.get("title", ""),
"authors": [a.get("name", "") for a in article.get("authors", [])],
"journal": article.get("fulljournalname", article.get("source", "")),
"pub_date": article.get("pubdate", ""),
"doi": article.get("elocationid", ""),
"url": f"https://pubmed.ncbi.nlm.nih.gov/{article_id}/",
"abstract_available": "abstract" in article,
})
return {
"success": True,
"source": "pubmed",
"query": query,
"results": results,
"count": len(results),
"total_found": int(search_data.get("esearchresult", {}).get("count", 0)),
}
except Exception as e:
log.error(f"PubMed search failed: {e}")
return {
"success": False,
"error": str(e),
"source": "pubmed",
}
def medical_get_pubmed_abstract(
pmid: str,
) -> dict:
"""
Get the abstract of a PubMed article.
Args:
pmid: PubMed ID
Returns:
Dictionary with article abstract
"""
try:
fetch_url = f"{PUBMED_EUTILS_API}/efetch.fcgi"
params = {
"db": "pubmed",
"id": pmid,
"rettype": "abstract",
"retmode": "text",
}
response = requests.get(fetch_url, params=params, timeout=15)
response.raise_for_status()
abstract_text = response.text.strip()
return {
"success": True,
"source": "pubmed",
"pmid": pmid,
"abstract": abstract_text,
"url": f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/",
}
except Exception as e:
log.error(f"PubMed abstract fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "pubmed",
"pmid": pmid,
}
def medical_get_disease_data(
disease: str = "covid",
) -> dict:
"""
Get current disease statistics from Disease.sh API.
Args:
disease: Disease type (covid, influenza, or all)
Returns:
Dictionary with disease statistics
"""
try:
disease = disease.lower()
if disease in ["covid", "covid-19", "coronavirus"]:
url = f"{DISEASE_API}/covid-19/all"
elif disease in ["influenza", "flu"]:
url = f"{DISEASE_API}/influenza/cdc"
elif disease == "all":
url = f"{DISEASE_API}/all"
else:
# Try COVID-19 countries data
url = f"{DISEASE_API}/covid-19/countries/{disease}"
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
return {
"success": True,
"source": "disease.sh",
"disease": disease,
"data": data,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
log.error(f"Disease data fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "disease.sh",
}
def medical_get_covid_country(
country: str = "usa",
) -> dict:
"""
Get COVID-19 statistics for a specific country.
Args:
country: Country name or ISO code (e.g., usa, uk, germany, china)
Returns:
Dictionary with country COVID-19 data
"""
try:
url = f"{DISEASE_API}/covid-19/countries/{country}"
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
return {
"success": True,
"source": "disease.sh",
"country": data.get("country", country),
"cases": data.get("cases"),
"today_cases": data.get("todayCases"),
"deaths": data.get("deaths"),
"today_deaths": data.get("todayDeaths"),
"recovered": data.get("recovered"),
"active": data.get("active"),
"critical": data.get("critical"),
"cases_per_million": data.get("casesPerOneMillion"),
"deaths_per_million": data.get("deathsPerOneMillion"),
"tests": data.get("tests"),
"tests_per_million": data.get("testsPerOneMillion"),
"population": data.get("population"),
"continent": data.get("continent"),
"updated": datetime.fromtimestamp(data.get("updated", 0) / 1000).isoformat() if data.get("updated") else None,
}
except Exception as e:
log.error(f"COVID country data fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "disease.sh",
}
def medical_search_fda(
query: str,
database: str = "drug",
limit: int = 10,
) -> dict:
"""
Search FDA drug, device, or food databases.
Args:
query: Search query
database: Database to search (drug, device, food, other)
limit: Maximum results (default: 10)
Returns:
Dictionary with FDA search results
"""
try:
# Map database names to FDA endpoints
db_map = {
"drug": "drug/label",
"device": "device/510k",
"food": "food/enforcement",
"other": "other/substance",
}
endpoint = db_map.get(database.lower(), "drug/label")
url = f"{OPENFDA_API}/{endpoint}.json"
params = {
"search": query,
"limit": limit,
}
response = requests.get(url, params=params, timeout=15)
response.raise_for_status()
data = response.json()
results = []
for item in data.get("results", []):
if database.lower() == "drug":
results.append({
"brand_name": item.get("openfda", {}).get("brand_name", [""])[0] if item.get("openfda") else "",
"generic_name": item.get("openfda", {}).get("generic_name", [""])[0] if item.get("openfda") else "",
"manufacturer": item.get("openfda", {}).get("manufacturer_name", [""])[0] if item.get("openfda") else "",
"purpose": item.get("purpose", [""])[0] if item.get("purpose") else "",
"indications": item.get("indications_and_usage", [""])[0][:500] if item.get("indications_and_usage") else "",
"warnings": item.get("warnings", [""])[0][:500] if item.get("warnings") else "",
})
else:
results.append(item)
return {
"success": True,
"source": "openfda",
"database": database,
"query": query,
"results": results,
"count": len(results),
}
except Exception as e:
log.error(f"FDA search failed: {e}")
return {
"success": False,
"error": str(e),
"source": "openfda",
}
def medical_get_health_topics(
topic: Optional[str] = None,
limit: int = 10,
) -> dict:
"""
Get health topics from Health.gov.
Args:
topic: Health topic to search (optional)
limit: Maximum results (default: 10)
Returns:
Dictionary with health topics
"""
try:
url = f"{HEALTH_GOV_API}/myhealthfinder/api/v3/topicsearch.json"
params = {"lang": "en"}
if topic:
params["topic"] = topic
else:
params["pageSize"] = limit
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
topics = data.get("Result", {}).get("Resources", {}).get("Resource", [])
results = []
for item in topics[:limit]:
results.append({
"title": item.get("Title", ""),
"url": item.get("AccessibleVersion", item.get("MyHealthfinder", "")),
"image_url": item.get("ImageAltUrl", ""),
"image_alt": item.get("ImageAltText", ""),
"categories": item.get("Categories", ""),
"content": item.get("Sections", {}).get("section", [{}])[0].get("Content", "")[:500] if item.get("Sections") else "",
})
return {
"success": True,
"source": "health.gov",
"topic": topic,
"results": results,
"count": len(results),
}
except Exception as e:
log.error(f"Health topics fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "health.gov",
}
# Tool schemas for OpenAI function calling
MEDICAL_SEARCH_PUBMED_SCHEMA = {
"type": "function",
"function": {
"name": "medical_search_pubmed",
"description": "Search PubMed for medical and health research articles. Use for scientific medical literature.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Medical search query (disease, treatment, drug, symptom)",
},
"max_results": {
"type": "integer",
"description": "Maximum number of results (default: 10)",
"default": 10,
},
},
"required": ["query"],
},
},
}
MEDICAL_GET_PUBMED_ABSTRACT_SCHEMA = {
"type": "function",
"function": {
"name": "medical_get_pubmed_abstract",
"description": "Get the full abstract of a PubMed article. Use after medical_search_pubmed to get detailed content.",
"parameters": {
"type": "object",
"properties": {
"pmid": {
"type": "string",
"description": "PubMed ID from search results",
},
},
"required": ["pmid"],
},
},
}
MEDICAL_GET_DISEASE_DATA_SCHEMA = {
"type": "function",
"function": {
"name": "medical_get_disease_data",
"description": "Get current disease statistics (COVID-19, influenza). Use for outbreak data and statistics.",
"parameters": {
"type": "object",
"properties": {
"disease": {
"type": "string",
"description": "Disease type (covid, influenza, all)",
"default": "covid",
},
},
"required": [],
},
},
}
MEDICAL_GET_COVID_COUNTRY_SCHEMA = {
"type": "function",
"function": {
"name": "medical_get_covid_country",
"description": "Get COVID-19 statistics for a specific country. Use for country-specific pandemic data.",
"parameters": {
"type": "object",
"properties": {
"country": {
"type": "string",
"description": "Country name or ISO code (e.g., usa, uk, germany)",
},
},
"required": [],
},
},
}
MEDICAL_SEARCH_FDA_SCHEMA = {
"type": "function",
"function": {
"name": "medical_search_fda",
"description": "Search FDA databases for drug information, device approvals, and food safety. Use for medication info.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (drug name, ingredient, etc.)",
},
"database": {
"type": "string",
"description": "Database to search (drug, device, food)",
"default": "drug",
"enum": ["drug", "device", "food"],
},
"limit": {
"type": "integer",
"description": "Maximum results (default: 10)",
"default": 10,
},
},
"required": ["query"],
},
},
}
MEDICAL_GET_HEALTH_TOPICS_SCHEMA = {
"type": "function",
"function": {
"name": "medical_get_health_topics",
"description": "Get health information and topics from Health.gov. Use for general health advice and wellness topics.",
"parameters": {
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "Health topic to search (optional)",
},
"limit": {
"type": "integer",
"description": "Maximum results (default: 10)",
"default": 10,
},
},
"required": [],
},
},
}

434
tools/news_tool.py Normal file
View File

@ -0,0 +1,434 @@
"""
News Tool - Fetch news from free sources
Free sources used:
- GNews API (free tier: 100 requests/day)
- Currents API (free tier: 200 requests/day)
- Hacker News (completely free)
- Reddit (free JSON feeds)
No API key required for Hacker News and Reddit.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Optional
import requests
log = logging.getLogger(__name__)
# Free news APIs (no key required for some)
GNEWS_API = "https://gnews.io/api/v4"
CURRENTS_API = "https://api.currentsapi.services/v1"
HACKER_NEWS_API = "https://hacker-news.firebaseio.com/v0"
REDDIT_API = "https://www.reddit.com"
def news_search_hackernews(
query: str,
limit: int = 10,
) -> dict:
"""
Search Hacker News for stories.
Args:
query: Search query
limit: Maximum number of results (default: 10)
Returns:
Dictionary with search results
"""
try:
# Use Hacker News Algolia API for search (free, no key)
search_url = "https://hn.algolia.com/api/v1/search"
params = {
"query": query,
"hitsPerPage": limit,
"tags": "story",
}
response = requests.get(search_url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
results = []
for hit in data.get("hits", []):
results.append({
"title": hit.get("title", ""),
"url": hit.get("url", ""),
"points": hit.get("points", 0),
"author": hit.get("author", ""),
"created_at": hit.get("created_at", ""),
"comments": hit.get("num_comments", 0),
"hn_link": f"https://news.ycombinator.com/item?id={hit.get('objectID', '')}",
})
return {
"success": True,
"source": "hacker_news",
"query": query,
"results": results,
"count": len(results),
}
except Exception as e:
log.error(f"Hacker News search failed: {e}")
return {
"success": False,
"error": str(e),
"source": "hacker_news",
}
def news_get_top_stories(
limit: int = 15,
) -> dict:
"""
Get top stories from Hacker News.
Args:
limit: Maximum number of stories (default: 15)
Returns:
Dictionary with top stories
"""
try:
# Get top story IDs
response = requests.get(f"{HACKER_NEWS_API}/topstories.json", timeout=10)
response.raise_for_status()
story_ids = response.json()[:limit]
results = []
for story_id in story_ids:
try:
story_response = requests.get(
f"{HACKER_NEWS_API}/item/{story_id}.json",
timeout=10
)
story = story_response.json()
if story:
results.append({
"title": story.get("title", ""),
"url": story.get("url", ""),
"points": story.get("score", 0),
"author": story.get("by", ""),
"time": datetime.fromtimestamp(story.get("time", 0)).isoformat(),
"comments": story.get("descendants", 0),
"hn_link": f"https://news.ycombinator.com/item?id={story_id}",
})
except Exception:
continue
return {
"success": True,
"source": "hacker_news",
"results": results,
"count": len(results),
}
except Exception as e:
log.error(f"Hacker News top stories failed: {e}")
return {
"success": False,
"error": str(e),
"source": "hacker_news",
}
def news_get_reddit(
subreddit: str = "worldnews",
limit: int = 15,
timeframe: str = "day",
) -> dict:
"""
Get top posts from a Reddit subreddit.
Args:
subreddit: Subreddit name (default: worldnews)
limit: Maximum number of posts (default: 15)
timeframe: Time period (hour, day, week, month, year, all)
Returns:
Dictionary with Reddit posts
"""
try:
# Reddit provides free JSON feeds
url = f"{REDDIT_API}/r/{subreddit}/top.json"
headers = {"User-Agent": "DocRAG/1.0"}
params = {
"limit": limit,
"t": timeframe,
}
response = requests.get(url, headers=headers, params=params, timeout=10)
response.raise_for_status()
data = response.json()
results = []
for child in data.get("data", {}).get("children", []):
post = child.get("data", {})
results.append({
"title": post.get("title", ""),
"url": post.get("url", ""),
"author": post.get("author", ""),
"score": post.get("score", 0),
"comments": post.get("num_comments", 0),
"subreddit": post.get("subreddit", ""),
"created": datetime.fromtimestamp(post.get("created_utc", 0)).isoformat(),
"permalink": f"https://reddit.com{post.get('permalink', '')}",
"selftext": post.get("selftext", "")[:500] if post.get("selftext") else "",
})
return {
"success": True,
"source": "reddit",
"subreddit": subreddit,
"timeframe": timeframe,
"results": results,
"count": len(results),
}
except Exception as e:
log.error(f"Reddit fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "reddit",
}
def news_search_reddit(
query: str,
subreddit: str = "all",
limit: int = 15,
) -> dict:
"""
Search Reddit for posts matching a query.
Args:
query: Search query
subreddit: Subreddit to search (default: all)
limit: Maximum number of results (default: 15)
Returns:
Dictionary with search results
"""
try:
url = f"{REDDIT_API}/r/{subreddit}/search.json"
headers = {"User-Agent": "DocRAG/1.0"}
params = {
"q": query,
"limit": limit,
"sort": "relevance",
"restrict_sr": "true" if subreddit != "all" else "false",
}
response = requests.get(url, headers=headers, params=params, timeout=10)
response.raise_for_status()
data = response.json()
results = []
for child in data.get("data", {}).get("children", []):
post = child.get("data", {})
results.append({
"title": post.get("title", ""),
"url": post.get("url", ""),
"author": post.get("author", ""),
"score": post.get("score", 0),
"comments": post.get("num_comments", 0),
"subreddit": post.get("subreddit", ""),
"created": datetime.fromtimestamp(post.get("created_utc", 0)).isoformat(),
"permalink": f"https://reddit.com{post.get('permalink', '')}",
"selftext": post.get("selftext", "")[:500] if post.get("selftext") else "",
})
return {
"success": True,
"source": "reddit",
"query": query,
"subreddit": subreddit,
"results": results,
"count": len(results),
}
except Exception as e:
log.error(f"Reddit search failed: {e}")
return {
"success": False,
"error": str(e),
"source": "reddit",
}
def news_aggregate(
query: str,
limit: int = 10,
) -> dict:
"""
Aggregate news from multiple free sources.
Args:
query: Search query
limit: Maximum results per source (default: 10)
Returns:
Dictionary with aggregated news from multiple sources
"""
results = []
errors = []
# Search Hacker News
hn_result = news_search_hackernews(query, limit)
if hn_result.get("success"):
results.extend([
{**r, "source": "hacker_news"} for r in hn_result.get("results", [])
])
else:
errors.append(f"Hacker News: {hn_result.get('error')}")
# Search Reddit
reddit_result = news_search_reddit(query, "all", limit)
if reddit_result.get("success"):
results.extend([
{**r, "source": "reddit"} for r in reddit_result.get("results", [])
])
else:
errors.append(f"Reddit: {reddit_result.get('error')}")
return {
"success": True,
"query": query,
"results": results,
"count": len(results),
"sources_checked": ["hacker_news", "reddit"],
"errors": errors if errors else None,
}
# Tool schemas for OpenAI function calling
NEWS_SEARCH_HACKERNEWS_SCHEMA = {
"type": "function",
"function": {
"name": "news_search_hackernews",
"description": "Search Hacker News for tech news and discussions. Best for technology, startups, programming topics.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 10)",
"default": 10,
},
},
"required": ["query"],
},
},
}
NEWS_GET_TOP_STORIES_SCHEMA = {
"type": "function",
"function": {
"name": "news_get_top_stories",
"description": "Get current top stories from Hacker News. Use for general tech news browsing.",
"parameters": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum number of stories (default: 15)",
"default": 15,
},
},
"required": [],
},
},
}
NEWS_GET_REDDIT_SCHEMA = {
"type": "function",
"function": {
"name": "news_get_reddit",
"description": "Get top posts from a Reddit subreddit. Great for news, discussions, and community content.",
"parameters": {
"type": "object",
"properties": {
"subreddit": {
"type": "string",
"description": "Subreddit name (e.g., worldnews, technology, science)",
"default": "worldnews",
},
"limit": {
"type": "integer",
"description": "Maximum number of posts (default: 15)",
"default": 15,
},
"timeframe": {
"type": "string",
"description": "Time period (hour, day, week, month, year, all)",
"default": "day",
"enum": ["hour", "day", "week", "month", "year", "all"],
},
},
"required": [],
},
},
}
NEWS_SEARCH_REDDIT_SCHEMA = {
"type": "function",
"function": {
"name": "news_search_reddit",
"description": "Search Reddit for posts matching a query across all subreddits.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
"subreddit": {
"type": "string",
"description": "Subreddit to search (default: all)",
"default": "all",
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 15)",
"default": 15,
},
},
"required": ["query"],
},
},
}
NEWS_AGGREGATE_SCHEMA = {
"type": "function",
"function": {
"name": "news_aggregate",
"description": "Search for news from multiple sources (Hacker News, Reddit) in one call. Best for comprehensive news coverage.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
"limit": {
"type": "integer",
"description": "Maximum results per source (default: 10)",
"default": 10,
},
},
"required": ["query"],
},
},
}

464
tools/science_tool.py Normal file
View File

@ -0,0 +1,464 @@
"""
Scientific/Academic Tool - Search scientific papers and research
Free sources used:
- arXiv API (completely free, no key required)
- Semantic Scholar API (free tier)
- DOAJ (Directory of Open Access Journals - free)
- CORE API (free access to research papers)
All APIs are free for basic use.
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Optional
import xml.etree.ElementTree as ET
import requests
log = logging.getLogger(__name__)
# Free academic APIs
ARXIV_API = "http://export.arxiv.org/api/query"
SEMANTIC_SCHOLAR_API = "https://api.semanticscholar.org/graph/v1"
DOAJ_API = "https://api.doaj.org"
def science_search_arxiv(
query: str,
max_results: int = 10,
category: Optional[str] = None,
) -> dict:
"""
Search arXiv for scientific preprints.
Args:
query: Search query
max_results: Maximum number of results (default: 10)
category: arXiv category filter (e.g., cs.AI, physics, math.CO)
Returns:
Dictionary with arXiv search results
"""
try:
# Build search query
search_query = query
if category:
search_query = f"cat:{category} AND {query}"
params = {
"search_query": search_query,
"start": 0,
"max_results": max_results,
"sortBy": "relevance",
"sortOrder": "descending",
}
response = requests.get(ARXIV_API, params=params, timeout=30)
response.raise_for_status()
# Parse XML response
root = ET.fromstring(response.content)
# Define namespace
ns = {"atom": "http://www.w3.org/2005/Atom"}
results = []
for entry in root.findall("atom:entry", ns):
title = entry.find("atom:title", ns)
summary = entry.find("atom:summary", ns)
published = entry.find("atom:published", ns)
updated = entry.find("atom:updated", ns)
link = entry.find("atom:id", ns)
authors = []
for author in entry.findall("atom:author", ns):
name = author.find("atom:name", ns)
if name is not None:
authors.append(name.text)
# Get categories
categories = []
for cat in entry.findall("atom:category", ns):
term = cat.get("term")
if term:
categories.append(term)
results.append({
"title": title.text.strip() if title is not None else "",
"abstract": summary.text.strip()[:1000] if summary is not None else "",
"authors": authors,
"published": published.text if published is not None else "",
"updated": updated.text if updated is not None else "",
"link": link.text if link is not None else "",
"pdf_link": link.text.replace("/abs/", "/pdf/") if link is not None else "",
"categories": categories,
})
return {
"success": True,
"source": "arxiv",
"query": query,
"category": category,
"results": results,
"count": len(results),
}
except Exception as e:
log.error(f"arXiv search failed: {e}")
return {
"success": False,
"error": str(e),
"source": "arxiv",
}
def science_search_semantic_scholar(
query: str,
limit: int = 10,
year: Optional[str] = None,
) -> dict:
"""
Search Semantic Scholar for academic papers.
Args:
query: Search query
limit: Maximum number of results (default: 10)
year: Year filter (e.g., "2020-", "2018-2022")
Returns:
Dictionary with Semantic Scholar results
"""
try:
url = f"{SEMANTIC_SCHOLAR_API}/paper/search"
params = {
"query": query,
"limit": limit,
"fields": "title,abstract,authors,year,venue,citationCount,openAccessPdf,url",
}
if year:
params["year"] = year
response = requests.get(url, params=params, timeout=15)
response.raise_for_status()
data = response.json()
results = []
for paper in data.get("data", []):
authors = [a.get("name", "") for a in paper.get("authors", [])]
pdf_url = None
if paper.get("openAccessPdf"):
pdf_url = paper["openAccessPdf"].get("url")
results.append({
"paper_id": paper.get("paperId"),
"title": paper.get("title", ""),
"abstract": paper.get("abstract", "")[:1000] if paper.get("abstract") else "",
"authors": authors,
"year": paper.get("year"),
"venue": paper.get("venue", ""),
"citations": paper.get("citationCount", 0),
"url": paper.get("url"),
"pdf_url": pdf_url,
})
return {
"success": True,
"source": "semantic_scholar",
"query": query,
"year_filter": year,
"results": results,
"count": len(results),
"total": data.get("total", len(results)),
}
except Exception as e:
log.error(f"Semantic Scholar search failed: {e}")
return {
"success": False,
"error": str(e),
"source": "semantic_scholar",
}
def science_get_paper_details(
paper_id: str,
) -> dict:
"""
Get detailed information about a paper from Semantic Scholar.
Args:
paper_id: Semantic Scholar paper ID or DOI
Returns:
Dictionary with paper details
"""
try:
url = f"{SEMANTIC_SCHOLAR_API}/paper/{paper_id}"
params = {
"fields": "title,abstract,authors,year,venue,citationCount,referenceCount,openAccessPdf,url,journal,publicationVenue,tldr",
}
response = requests.get(url, params=params, timeout=15)
response.raise_for_status()
paper = response.json()
authors = [a.get("name", "") for a in paper.get("authors", [])]
pdf_url = None
if paper.get("openAccessPdf"):
pdf_url = paper["openAccessPdf"].get("url")
tldr = None
if paper.get("tldr"):
tldr = paper["tldr"].get("text")
return {
"success": True,
"source": "semantic_scholar",
"paper_id": paper.get("paperId"),
"title": paper.get("title", ""),
"abstract": paper.get("abstract", ""),
"authors": authors,
"year": paper.get("year"),
"venue": paper.get("venue", ""),
"journal": paper.get("journal", {}).get("name") if paper.get("journal") else None,
"citations": paper.get("citationCount", 0),
"references": paper.get("referenceCount", 0),
"url": paper.get("url"),
"pdf_url": pdf_url,
"tldr": tldr,
}
except Exception as e:
log.error(f"Paper details fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "semantic_scholar",
}
def science_search_doaj(
query: str,
limit: int = 10,
) -> dict:
"""
Search DOAJ (Directory of Open Access Journals).
Args:
query: Search query
limit: Maximum number of results (default: 10)
Returns:
Dictionary with DOAJ results
"""
try:
url = f"{DOAJ_API}/search/articles/{query}"
params = {
"pageSize": limit,
"page": 1,
}
headers = {"Accept": "application/json"}
response = requests.get(url, params=params, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
results = []
for article in data.get("results", []):
bibjson = article.get("bibjson", {})
results.append({
"title": bibjson.get("title", ""),
"abstract": bibjson.get("abstract", "")[:1000] if bibjson.get("abstract") else "",
"authors": [a.get("name", "") for a in bibjson.get("author", [])],
"year": bibjson.get("year"),
"journal": bibjson.get("journal", {}).get("title", ""),
"doi": bibjson.get("identifier", [{}])[0].get("id") if bibjson.get("identifier") else None,
"link": bibjson.get("link", [{}])[0].get("url") if bibjson.get("link") else None,
"keywords": bibjson.get("keywords", []),
})
return {
"success": True,
"source": "doaj",
"query": query,
"results": results,
"count": len(results),
"total": data.get("total", len(results)),
}
except Exception as e:
log.error(f"DOAJ search failed: {e}")
return {
"success": False,
"error": str(e),
"source": "doaj",
}
def science_aggregate_search(
query: str,
limit: int = 5,
) -> dict:
"""
Search multiple academic sources at once.
Args:
query: Search query
limit: Maximum results per source (default: 5)
Returns:
Dictionary with aggregated results from multiple sources
"""
results = []
errors = []
# Search arXiv
arxiv_result = science_search_arxiv(query, limit)
if arxiv_result.get("success"):
results.extend([{**r, "source": "arxiv"} for r in arxiv_result.get("results", [])])
else:
errors.append(f"arXiv: {arxiv_result.get('error')}")
# Search Semantic Scholar
ss_result = science_search_semantic_scholar(query, limit)
if ss_result.get("success"):
results.extend([{**r, "source": "semantic_scholar"} for r in ss_result.get("results", [])])
else:
errors.append(f"Semantic Scholar: {ss_result.get('error')}")
return {
"success": True,
"query": query,
"results": results,
"count": len(results),
"sources_checked": ["arxiv", "semantic_scholar"],
"errors": errors if errors else None,
}
# Tool schemas for OpenAI function calling
SCIENCE_SEARCH_ARXIV_SCHEMA = {
"type": "function",
"function": {
"name": "science_search_arxiv",
"description": "Search arXiv for scientific preprints. Best for physics, math, computer science, and AI research.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
"max_results": {
"type": "integer",
"description": "Maximum number of results (default: 10)",
"default": 10,
},
"category": {
"type": "string",
"description": "arXiv category filter (e.g., cs.AI, cs.LG, physics, math.CO)",
},
},
"required": ["query"],
},
},
}
SCIENCE_SEARCH_SEMANTIC_SCHOLAR_SCHEMA = {
"type": "function",
"function": {
"name": "science_search_semantic_scholar",
"description": "Search Semantic Scholar for academic papers across all fields. Includes citation counts and open access PDFs.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 10)",
"default": 10,
},
"year": {
"type": "string",
"description": "Year filter (e.g., '2020-', '2018-2022')",
},
},
"required": ["query"],
},
},
}
SCIENCE_GET_PAPER_DETAILS_SCHEMA = {
"type": "function",
"function": {
"name": "science_get_paper_details",
"description": "Get detailed information about a specific paper including TLDR summary. Use paper ID from search results.",
"parameters": {
"type": "object",
"properties": {
"paper_id": {
"type": "string",
"description": "Semantic Scholar paper ID or DOI",
},
},
"required": ["paper_id"],
},
},
}
SCIENCE_SEARCH_DOAJ_SCHEMA = {
"type": "function",
"function": {
"name": "science_search_doaj",
"description": "Search DOAJ for open access journal articles. Best for peer-reviewed open access research.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 10)",
"default": 10,
},
},
"required": ["query"],
},
},
}
SCIENCE_AGGREGATE_SEARCH_SCHEMA = {
"type": "function",
"function": {
"name": "science_aggregate_search",
"description": "Search multiple academic sources (arXiv, Semantic Scholar) at once for comprehensive coverage.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
"limit": {
"type": "integer",
"description": "Maximum results per source (default: 5)",
"default": 5,
},
},
"required": ["query"],
},
},
}

420
tools/weather_tool.py Normal file
View File

@ -0,0 +1,420 @@
"""
Weather Tool - Get weather data and forecasts
Free sources used:
- Open-Meteo API (completely free, no API key required)
- OpenWeatherMap (free tier available)
Primary use: Open-Meteo (no key required)
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Optional
import requests
log = logging.getLogger(__name__)
# Free weather APIs
OPEN_METEO_API = "https://api.open-meteo.com/v1"
GEOCODING_API = "https://geocoding-api.open-meteo.com/v1"
def weather_get_coordinates(
location: str,
) -> dict:
"""
Get coordinates for a location name.
Args:
location: City name or location (e.g., "New York", "London, UK")
Returns:
Dictionary with location coordinates
"""
try:
url = f"{GEOCODING_API}/search"
params = {
"name": location,
"count": 1,
"language": "en",
"format": "json",
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
results = data.get("results", [])
if not results:
return {
"success": False,
"error": f"Location not found: {location}",
"source": "open-meteo",
}
loc = results[0]
return {
"success": True,
"source": "open-meteo",
"name": loc.get("name", ""),
"country": loc.get("country", ""),
"latitude": loc.get("latitude"),
"longitude": loc.get("longitude"),
"elevation": loc.get("elevation"),
"timezone": loc.get("timezone"),
"population": loc.get("population"),
}
except Exception as e:
log.error(f"Geocoding failed: {e}")
return {
"success": False,
"error": str(e),
"source": "open-meteo",
}
def weather_get_current(
location: str,
units: str = "celsius",
) -> dict:
"""
Get current weather for a location.
Args:
location: City name or location
units: Temperature units (celsius or fahrenheit)
Returns:
Dictionary with current weather data
"""
try:
# First get coordinates
geo = weather_get_coordinates(location)
if not geo.get("success"):
return geo
lat = geo["latitude"]
lon = geo["longitude"]
url = f"{OPEN_METEO_API}/forecast"
params = {
"latitude": lat,
"longitude": lon,
"current": "temperature_2m,relative_humidity_2m,apparent_temperature,precipitation,rain,showers,snowfall,weather_code,cloud_cover,pressure_msl,surface_pressure,wind_speed_10m,wind_direction_10m,wind_gusts_10m",
"temperature_unit": units,
"timezone": "auto",
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
current = data.get("current", {})
# Weather code descriptions
weather_codes = {
0: "Clear sky",
1: "Mainly clear", 2: "Partly cloudy", 3: "Overcast",
45: "Fog", 48: "Depositing rime fog",
51: "Light drizzle", 53: "Moderate drizzle", 55: "Dense drizzle",
56: "Light freezing drizzle", 57: "Dense freezing drizzle",
61: "Slight rain", 63: "Moderate rain", 65: "Heavy rain",
66: "Light freezing rain", 67: "Heavy freezing rain",
71: "Slight snow", 73: "Moderate snow", 75: "Heavy snow",
77: "Snow grains",
80: "Slight rain showers", 81: "Moderate rain showers", 82: "Violent rain showers",
85: "Slight snow showers", 86: "Heavy snow showers",
95: "Thunderstorm", 96: "Thunderstorm with slight hail", 99: "Thunderstorm with heavy hail",
}
weather_code = current.get("weather_code", 0)
weather_description = weather_codes.get(weather_code, "Unknown")
return {
"success": True,
"source": "open-meteo",
"location": geo.get("name", location),
"country": geo.get("country", ""),
"latitude": lat,
"longitude": lon,
"timezone": data.get("timezone", ""),
"temperature": current.get("temperature_2m"),
"feels_like": current.get("apparent_temperature"),
"humidity": current.get("relative_humidity_2m"),
"weather_code": weather_code,
"weather_description": weather_description,
"cloud_cover": current.get("cloud_cover"),
"pressure_msl": current.get("pressure_msl"),
"wind_speed": current.get("wind_speed_10m"),
"wind_direction": current.get("wind_direction_10m"),
"wind_gusts": current.get("wind_gusts_10m"),
"precipitation": current.get("precipitation"),
"rain": current.get("rain"),
"snowfall": current.get("snowfall"),
"units": units,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
log.error(f"Weather fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "open-meteo",
}
def weather_get_forecast(
location: str,
days: int = 7,
units: str = "celsius",
) -> dict:
"""
Get weather forecast for a location.
Args:
location: City name or location
days: Number of forecast days (1-16)
units: Temperature units (celsius or fahrenheit)
Returns:
Dictionary with weather forecast
"""
try:
# First get coordinates
geo = weather_get_coordinates(location)
if not geo.get("success"):
return geo
lat = geo["latitude"]
lon = geo["longitude"]
url = f"{OPEN_METEO_API}/forecast"
params = {
"latitude": lat,
"longitude": lon,
"daily": "weather_code,temperature_2m_max,temperature_2m_min,apparent_temperature_max,apparent_temperature_min,sunrise,sunset,uv_index_max,precipitation_sum,rain_sum,showers_sum,snowfall_sum,precipitation_probability_max,wind_speed_10m_max,wind_gusts_10m_max",
"temperature_unit": units,
"timezone": "auto",
"forecast_days": min(days, 16),
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
daily = data.get("daily", {})
# Weather code descriptions
weather_codes = {
0: "Clear sky",
1: "Mainly clear", 2: "Partly cloudy", 3: "Overcast",
45: "Fog", 48: "Depositing rime fog",
51: "Light drizzle", 53: "Moderate drizzle", 55: "Dense drizzle",
56: "Light freezing drizzle", 57: "Dense freezing drizzle",
61: "Slight rain", 63: "Moderate rain", 65: "Heavy rain",
66: "Light freezing rain", 67: "Heavy freezing rain",
71: "Slight snow", 73: "Moderate snow", 75: "Heavy snow",
77: "Snow grains",
80: "Slight rain showers", 81: "Moderate rain showers", 82: "Violent rain showers",
85: "Slight snow showers", 86: "Heavy snow showers",
95: "Thunderstorm", 96: "Thunderstorm with slight hail", 99: "Thunderstorm with heavy hail",
}
forecasts = []
dates = daily.get("time", [])
for i, date in enumerate(dates):
weather_code = daily.get("weather_code", [])[i] if i < len(daily.get("weather_code", [])) else 0
forecasts.append({
"date": date,
"temp_max": daily.get("temperature_2m_max", [])[i] if i < len(daily.get("temperature_2m_max", [])) else None,
"temp_min": daily.get("temperature_2m_min", [])[i] if i < len(daily.get("temperature_2m_min", [])) else None,
"feels_like_max": daily.get("apparent_temperature_max", [])[i] if i < len(daily.get("apparent_temperature_max", [])) else None,
"feels_like_min": daily.get("apparent_temperature_min", [])[i] if i < len(daily.get("apparent_temperature_min", [])) else None,
"weather_code": weather_code,
"weather_description": weather_codes.get(weather_code, "Unknown"),
"precipitation": daily.get("precipitation_sum", [])[i] if i < len(daily.get("precipitation_sum", [])) else None,
"rain": daily.get("rain_sum", [])[i] if i < len(daily.get("rain_sum", [])) else None,
"snowfall": daily.get("snowfall_sum", [])[i] if i < len(daily.get("snowfall_sum", [])) else None,
"precipitation_probability": daily.get("precipitation_probability_max", [])[i] if i < len(daily.get("precipitation_probability_max", [])) else None,
"uv_index": daily.get("uv_index_max", [])[i] if i < len(daily.get("uv_index_max", [])) else None,
"wind_speed_max": daily.get("wind_speed_10m_max", [])[i] if i < len(daily.get("wind_speed_10m_max", [])) else None,
"wind_gusts_max": daily.get("wind_gusts_10m_max", [])[i] if i < len(daily.get("wind_gusts_10m_max", [])) else None,
"sunrise": daily.get("sunrise", [])[i] if i < len(daily.get("sunrise", [])) else None,
"sunset": daily.get("sunset", [])[i] if i < len(daily.get("sunset", [])) else None,
})
return {
"success": True,
"source": "open-meteo",
"location": geo.get("name", location),
"country": geo.get("country", ""),
"latitude": lat,
"longitude": lon,
"timezone": data.get("timezone", ""),
"units": units,
"forecast": forecasts,
"count": len(forecasts),
}
except Exception as e:
log.error(f"Weather forecast fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "open-meteo",
}
def weather_get_air_quality(
location: str,
) -> dict:
"""
Get air quality index for a location.
Args:
location: City name or location
Returns:
Dictionary with air quality data
"""
try:
# First get coordinates
geo = weather_get_coordinates(location)
if not geo.get("success"):
return geo
lat = geo["latitude"]
lon = geo["longitude"]
url = "https://air-quality-api.open-meteo.com/v1/air-quality"
params = {
"latitude": lat,
"longitude": lon,
"current": "us_aqi,pm10,pm2_5,carbon_monoxide,nitrogen_dioxide,sulphur_dioxide,ozone,ammonia",
"timezone": "auto",
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
current = data.get("current", {})
# US AQI categories
aqi = current.get("us_aqi", 0)
if aqi <= 50:
category = "Good"
elif aqi <= 100:
category = "Moderate"
elif aqi <= 150:
category = "Unhealthy for Sensitive Groups"
elif aqi <= 200:
category = "Unhealthy"
elif aqi <= 300:
category = "Very Unhealthy"
else:
category = "Hazardous"
return {
"success": True,
"source": "open-meteo",
"location": geo.get("name", location),
"country": geo.get("country", ""),
"us_aqi": aqi,
"aqi_category": category,
"pm2_5": current.get("pm2_5"),
"pm10": current.get("pm10"),
"carbon_monoxide": current.get("carbon_monoxide"),
"nitrogen_dioxide": current.get("nitrogen_dioxide"),
"sulphur_dioxide": current.get("sulphur_dioxide"),
"ozone": current.get("ozone"),
"ammonia": current.get("ammonia"),
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
log.error(f"Air quality fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "open-meteo",
}
# Tool schemas for OpenAI function calling
WEATHER_GET_CURRENT_SCHEMA = {
"type": "function",
"function": {
"name": "weather_get_current",
"description": "Get current weather conditions for any location worldwide. No API key required.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name or location (e.g., 'New York', 'London, UK', 'Tokyo')",
},
"units": {
"type": "string",
"description": "Temperature units",
"default": "celsius",
"enum": ["celsius", "fahrenheit"],
},
},
"required": ["location"],
},
},
}
WEATHER_GET_FORECAST_SCHEMA = {
"type": "function",
"function": {
"name": "weather_get_forecast",
"description": "Get weather forecast for up to 16 days. Includes temperature, precipitation, UV index, and more.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name or location",
},
"days": {
"type": "integer",
"description": "Number of forecast days (1-16)",
"default": 7,
},
"units": {
"type": "string",
"description": "Temperature units",
"default": "celsius",
"enum": ["celsius", "fahrenheit"],
},
},
"required": ["location"],
},
},
}
WEATHER_GET_AIR_QUALITY_SCHEMA = {
"type": "function",
"function": {
"name": "weather_get_air_quality",
"description": "Get air quality index and pollutant levels for a location. Includes PM2.5, PM10, ozone, and more.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name or location",
},
},
"required": ["location"],
},
},
}

427
tools/web_tool.py Normal file
View File

@ -0,0 +1,427 @@
"""
Web Search Tool - General web search capabilities
Free sources used:
- DuckDuckGo Instant Answer API (completely free)
- DuckDuckGo HTML search (free, no API key)
- Wikipedia API (as fallback)
All completely free, no API keys required.
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Optional
from urllib.parse import quote_plus, unquote_plus
import requests
log = logging.getLogger(__name__)
# Free search endpoints
DUCKDUCKGO_API = "https://api.duckduckgo.com"
DUCKDUCKGO_HTML = "https://html.duckduckgo.com/html"
def web_search(
query: str,
max_results: int = 10,
) -> dict:
"""
Search the web using DuckDuckGo.
Args:
query: Search query
max_results: Maximum number of results (default: 10)
Returns:
Dictionary with search results
"""
try:
# Use DuckDuckGo HTML search (free, no API key)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
params = {"q": query}
response = requests.get(
DUCKDUCKGO_HTML,
params=params,
headers=headers,
timeout=15
)
response.raise_for_status()
# Parse HTML results
results = _parse_ddg_html(response.text, max_results)
return {
"success": True,
"source": "duckduckgo",
"query": query,
"results": results,
"count": len(results),
}
except Exception as e:
log.error(f"Web search failed: {e}")
return {
"success": False,
"error": str(e),
"source": "duckduckgo",
}
def _parse_ddg_html(html: str, max_results: int) -> list:
"""Parse DuckDuckGo HTML results."""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, "html.parser")
results = []
# Find result links
for result in soup.select(".result")[:max_results]:
try:
link_elem = result.select_one(".result__a")
snippet_elem = result.select_one(".result__snippet")
if link_elem:
url = link_elem.get("href", "")
# Extract actual URL from redirect
if "uddg=" in url:
url = url.split("uddg=")[-1].split("&")[0]
url = unquote_plus(url)
results.append({
"title": link_elem.get_text(strip=True),
"url": url,
"snippet": snippet_elem.get_text(strip=True) if snippet_elem else "",
})
except Exception:
continue
return results
def web_instant_answer(
query: str,
) -> dict:
"""
Get instant answer from DuckDuckGo.
Args:
query: Query for instant answer
Returns:
Dictionary with instant answer
"""
try:
params = {
"q": query,
"format": "json",
"no_html": 1,
"skip_disambig": 0,
}
response = requests.get(DUCKDUCKGO_API, params=params, timeout=10)
response.raise_for_status()
data = response.json()
result = {
"success": True,
"source": "duckduckgo",
"query": query,
}
# Abstract (main answer)
if data.get("Abstract"):
result["abstract"] = data.get("Abstract")
result["abstract_source"] = data.get("AbstractSource")
result["abstract_url"] = data.get("AbstractURL")
result["image"] = data.get("Image")
# Definition
if data.get("Definition"):
result["definition"] = data.get("Definition")
result["definition_source"] = data.get("DefinitionSource")
# Answer
if data.get("Answer"):
result["answer"] = data.get("Answer")
# Related topics
related = []
for topic in data.get("RelatedTopics", [])[:5]:
if isinstance(topic, dict) and topic.get("Text"):
related.append({
"text": topic.get("Text"),
"url": topic.get("FirstURL"),
})
if related:
result["related_topics"] = related
# Infobox
if data.get("Infobox"):
result["infobox"] = data.get("Infobox")
return result
except Exception as e:
log.error(f"Instant answer failed: {e}")
return {
"success": False,
"error": str(e),
"source": "duckduckgo",
}
def web_get_page_content(
url: str,
max_length: int = 5000,
) -> dict:
"""
Fetch and extract text content from a web page.
Args:
url: URL to fetch
max_length: Maximum content length (default: 5000 chars)
Returns:
Dictionary with page content
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
# Parse and extract text
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, "html.parser")
# Remove script and style elements
for element in soup(["script", "style", "nav", "header", "footer"]):
element.decompose()
# Get title
title = ""
if soup.title:
title = soup.title.get_text(strip=True)
# Get main content
text = soup.get_text(separator="\n", strip=True)
# Clean up whitespace
lines = [line.strip() for line in text.splitlines() if line.strip()]
text = "\n".join(lines)
# Truncate if needed
if len(text) > max_length:
text = text[:max_length] + "..."
return {
"success": True,
"source": "web",
"url": url,
"title": title,
"content": text,
"content_length": len(text),
}
except Exception as e:
log.error(f"Page content fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "web",
"url": url,
}
def web_search_and_fetch(
query: str,
max_results: int = 3,
max_content_length: int = 3000,
) -> dict:
"""
Search web and fetch content from top results.
Args:
query: Search query
max_results: Number of results to fetch (default: 3)
max_content_length: Max content per page (default: 3000)
Returns:
Dictionary with search results and fetched content
"""
try:
# First, search
search_result = web_search(query, max_results)
if not search_result.get("success"):
return search_result
results = search_result.get("results", [])
# Fetch content from each result
enriched_results = []
for result in results:
if result.get("url"):
content = web_get_page_content(result["url"], max_content_length)
result["fetched_content"] = content.get("content", "") if content.get("success") else ""
enriched_results.append(result)
return {
"success": True,
"source": "duckduckgo",
"query": query,
"results": enriched_results,
"count": len(enriched_results),
}
except Exception as e:
log.error(f"Search and fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "duckduckgo",
}
def web_get_headers(
url: str,
) -> dict:
"""
Get HTTP headers for a URL.
Args:
url: URL to check
Returns:
Dictionary with HTTP headers
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.head(url, headers=headers, timeout=10, allow_redirects=True)
return {
"success": True,
"source": "web",
"url": url,
"status_code": response.status_code,
"headers": dict(response.headers),
"final_url": response.url,
}
except Exception as e:
log.error(f"Header fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "web",
"url": url,
}
# Tool schemas for OpenAI function calling
WEB_SEARCH_SCHEMA = {
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web using DuckDuckGo. Returns search results with titles, URLs, and snippets. Free, no API key required.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
"max_results": {
"type": "integer",
"description": "Maximum number of results (default: 10)",
"default": 10,
},
},
"required": ["query"],
},
},
}
WEB_INSTANT_ANSWER_SCHEMA = {
"type": "function",
"function": {
"name": "web_instant_answer",
"description": "Get instant answer from DuckDuckGo for facts, definitions, and summaries. Good for quick facts.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Query for instant answer",
},
},
"required": ["query"],
},
},
}
WEB_GET_PAGE_CONTENT_SCHEMA = {
"type": "function",
"function": {
"name": "web_get_page_content",
"description": "Fetch and extract text content from a web page URL. Use after web_search to get full content.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL to fetch",
},
"max_length": {
"type": "integer",
"description": "Maximum content length in characters (default: 5000)",
"default": 5000,
},
},
"required": ["url"],
},
},
}
WEB_SEARCH_AND_FETCH_SCHEMA = {
"type": "function",
"function": {
"name": "web_search_and_fetch",
"description": "Search web and automatically fetch content from top results. Best for comprehensive research.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
"max_results": {
"type": "integer",
"description": "Number of results to fetch (default: 3)",
"default": 3,
},
"max_content_length": {
"type": "integer",
"description": "Max content per page (default: 3000)",
"default": 3000,
},
},
"required": ["query"],
},
},
}

259
tools/wikipedia_tool.py Normal file
View File

@ -0,0 +1,259 @@
"""
Wikipedia Tool - Search and retrieve Wikipedia articles
Free API with no authentication required.
Rate limit: Be respectful, no strict limits.
"""
from __future__ import annotations
import logging
from typing import Optional
import requests
log = logging.getLogger(__name__)
WIKIPEDIA_API = "https://en.wikipedia.org/w/api.php"
def wikipedia_search(
query: str,
limit: int = 5,
) -> dict:
"""
Search Wikipedia for articles matching the query.
Args:
query: Search query
limit: Maximum number of results (default: 5)
Returns:
Dictionary with search results
"""
try:
params = {
"action": "query",
"list": "search",
"srsearch": query,
"srlimit": limit,
"format": "json",
"utf8": 1,
}
response = requests.get(WIKIPEDIA_API, params=params, timeout=10)
response.raise_for_status()
data = response.json()
results = []
for item in data.get("query", {}).get("search", []):
results.append({
"title": item.get("title", ""),
"pageid": item.get("pageid", 0),
"snippet": item.get("snippet", "").replace("<span class=\"searchmatch\">", "").replace("</span>", ""),
"wordcount": item.get("wordcount", 0),
"url": f"https://en.wikipedia.org/?curid={item.get('pageid', 0)}",
})
return {
"success": True,
"source": "wikipedia",
"query": query,
"results": results,
"count": len(results),
}
except Exception as e:
log.error(f"Wikipedia search failed: {e}")
return {
"success": False,
"error": str(e),
"source": "wikipedia",
}
def wikipedia_get_article(
title: str,
sentences: int = 10,
) -> dict:
"""
Get the content of a Wikipedia article.
Args:
title: Article title (exact match or pageid)
sentences: Number of sentences to return (default: 10, max: 50)
Returns:
Dictionary with article content
"""
try:
params = {
"action": "query",
"prop": "extracts",
"exsentences": min(sentences, 50),
"exintro": True,
"explaintext": True,
"titles": title,
"format": "json",
"utf8": 1,
"redirects": 1,
}
response = requests.get(WIKIPEDIA_API, params=params, timeout=10)
response.raise_for_status()
data = response.json()
pages = data.get("query", {}).get("pages", {})
articles = []
for page_id, page_data in pages.items():
if page_id != "-1": # -1 means page not found
articles.append({
"title": page_data.get("title", ""),
"pageid": page_id,
"extract": page_data.get("extract", ""),
"url": f"https://en.wikipedia.org/?curid={page_id}",
})
if not articles:
return {
"success": False,
"error": f"Article not found: {title}",
"source": "wikipedia",
}
return {
"success": True,
"source": "wikipedia",
"articles": articles,
"count": len(articles),
}
except Exception as e:
log.error(f"Wikipedia article fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "wikipedia",
}
def wikipedia_get_full_article(
title: str,
) -> dict:
"""
Get the full content of a Wikipedia article.
Args:
title: Article title
Returns:
Dictionary with full article content
"""
try:
params = {
"action": "query",
"prop": "extracts",
"explaintext": True,
"titles": title,
"format": "json",
"utf8": 1,
"redirects": 1,
}
response = requests.get(WIKIPEDIA_API, params=params, timeout=15)
response.raise_for_status()
data = response.json()
pages = data.get("query", {}).get("pages", {})
for page_id, page_data in pages.items():
if page_id != "-1":
return {
"success": True,
"source": "wikipedia",
"title": page_data.get("title", ""),
"pageid": page_id,
"content": page_data.get("extract", ""),
"url": f"https://en.wikipedia.org/?curid={page_id}",
}
return {
"success": False,
"error": f"Article not found: {title}",
"source": "wikipedia",
}
except Exception as e:
log.error(f"Wikipedia full article fetch failed: {e}")
return {
"success": False,
"error": str(e),
"source": "wikipedia",
}
# Tool schemas for OpenAI function calling
WIKIPEDIA_SEARCH_SCHEMA = {
"type": "function",
"function": {
"name": "wikipedia_search",
"description": "Search Wikipedia for articles matching a query. Returns a list of article titles and snippets.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query",
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return (default: 5)",
"default": 5,
},
},
"required": ["query"],
},
},
}
WIKIPEDIA_GET_ARTICLE_SCHEMA = {
"type": "function",
"function": {
"name": "wikipedia_get_article",
"description": "Get the introduction/summary of a Wikipedia article. Use this after wikipedia_search to get more details.",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The exact article title from search results",
},
"sentences": {
"type": "integer",
"description": "Number of sentences to return (default: 10)",
"default": 10,
},
},
"required": ["title"],
},
},
}
WIKIPEDIA_GET_FULL_ARTICLE_SCHEMA = {
"type": "function",
"function": {
"name": "wikipedia_get_full_article",
"description": "Get the full content of a Wikipedia article. Use for comprehensive research when the summary is not enough.",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The exact article title",
},
},
"required": ["title"],
},
},
}