Features:
- RAG system now uses website_downloader_tool as primary content ingestion method
- download_and_ingest_website() method for complete website processing
- Stores page pointers (source_url, page_url, local_path) in vector store
- Site registry tracks all downloaded websites with metadata
- New API endpoints for website management:
- POST /v1/documents/website - Download and ingest a website
- GET /v1/documents/sites - List all downloaded sites
- GET /v1/documents/sites/{url} - Get site info
- DELETE /v1/documents/sites/{url} - Delete a site and its content
Changes:
- rag/__init__.py: Added download_and_ingest_website(), site registry
- rag/document_processor.py: Added extract_text_from_html() public method
- rag/vector_store.py: Added delete_by_source_url(), get_stats()
- main.py: New website endpoints, integrated tool with RAG system
338 lines
10 KiB
Python
338 lines
10 KiB
Python
"""
|
|
Vector Store - Handles vector storage and similarity search
|
|
|
|
Provides a simple file-based vector store that can be extended to use
|
|
more sophisticated backends like ChromaDB, FAISS, or Pinecone.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class VectorStore:
|
|
"""
|
|
Vector store for document embeddings.
|
|
|
|
This implementation provides:
|
|
- Simple file-based persistence
|
|
- In-memory similarity search
|
|
- Document management
|
|
|
|
Can be extended to use ChromaDB, FAISS, or other vector databases.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
persist_directory: str = "./data/vectors",
|
|
embedding_model: str = "text-embedding-3-small",
|
|
):
|
|
self.persist_directory = Path(persist_directory)
|
|
self.embedding_model = embedding_model
|
|
|
|
self._chunks: list[dict[str, Any]] = []
|
|
self._embeddings: list[list[float]] = []
|
|
self._metadata: list[dict[str, Any]] = []
|
|
self._ids: list[str] = []
|
|
|
|
self._initialized = False
|
|
|
|
async def initialize(self) -> None:
|
|
"""Initialize the vector store and load existing data."""
|
|
if self._initialized:
|
|
return
|
|
|
|
self.persist_directory.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Load existing data
|
|
await self._load()
|
|
|
|
self._initialized = True
|
|
log.info(f"Vector store initialized with {len(self._chunks)} chunks")
|
|
|
|
async def close(self) -> None:
|
|
"""Save and close the vector store."""
|
|
await self._save()
|
|
log.info("Vector store closed")
|
|
|
|
async def _load(self) -> None:
|
|
"""Load data from disk."""
|
|
data_file = self.persist_directory / "store.json"
|
|
|
|
if not data_file.exists():
|
|
return
|
|
|
|
try:
|
|
with open(data_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
self._chunks = data.get("chunks", [])
|
|
self._embeddings = data.get("embeddings", [])
|
|
self._metadata = data.get("metadata", [])
|
|
self._ids = data.get("ids", [])
|
|
|
|
log.info(f"Loaded {len(self._chunks)} chunks from disk")
|
|
|
|
except Exception as e:
|
|
log.error(f"Failed to load vector store: {e}")
|
|
|
|
async def _save(self) -> None:
|
|
"""Save data to disk."""
|
|
data_file = self.persist_directory / "store.json"
|
|
|
|
try:
|
|
data = {
|
|
"chunks": self._chunks,
|
|
"embeddings": self._embeddings,
|
|
"metadata": self._metadata,
|
|
"ids": self._ids,
|
|
}
|
|
|
|
with open(data_file, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
log.info(f"Saved {len(self._chunks)} chunks to disk")
|
|
|
|
except Exception as e:
|
|
log.error(f"Failed to save vector store: {e}")
|
|
|
|
def _ensure_initialized(self) -> None:
|
|
"""Ensure the vector store is initialized."""
|
|
if not self._initialized:
|
|
raise RuntimeError("Vector store not initialized")
|
|
|
|
async def add_chunks(
|
|
self,
|
|
chunks: list[str],
|
|
metadatas: Optional[list[dict[str, Any]]] = None,
|
|
ids: Optional[list[str]] = None,
|
|
) -> None:
|
|
"""
|
|
Add chunks to the vector store.
|
|
|
|
Args:
|
|
chunks: List of text chunks
|
|
metadatas: Optional list of metadata dicts
|
|
ids: Optional list of chunk IDs
|
|
"""
|
|
self._ensure_initialized()
|
|
|
|
if not chunks:
|
|
return
|
|
|
|
# Generate IDs if not provided
|
|
if ids is None:
|
|
ids = [hashlib.md5(chunk.encode()).hexdigest() for chunk in chunks]
|
|
|
|
# Generate metadata if not provided
|
|
if metadatas is None:
|
|
metadatas = [{}] * len(chunks)
|
|
|
|
# Generate embeddings
|
|
embeddings = await self._generate_embeddings(chunks)
|
|
|
|
# Store everything
|
|
for i, (chunk, embedding, metadata, chunk_id) in enumerate(
|
|
zip(chunks, embeddings, metadatas, ids)
|
|
):
|
|
self._chunks.append({"id": chunk_id, "content": chunk})
|
|
self._embeddings.append(embedding)
|
|
self._metadata.append(metadata)
|
|
self._ids.append(chunk_id)
|
|
|
|
# Save to disk
|
|
await self._save()
|
|
|
|
log.info(f"Added {len(chunks)} chunks to vector store")
|
|
|
|
async def _generate_embeddings(self, texts: list[str]) -> list[list[float]]:
|
|
"""
|
|
Generate embeddings for texts.
|
|
|
|
Uses a simple hash-based embedding for demonstration.
|
|
In production, use a real embedding model via API.
|
|
"""
|
|
embeddings = []
|
|
|
|
for text in texts:
|
|
# Simple hash-based embedding (for demo purposes)
|
|
# In production, use OpenAI embeddings or similar
|
|
hash_bytes = hashlib.sha256(text.encode()).digest()
|
|
# Create a 384-dimensional embedding (common size)
|
|
embedding = []
|
|
for i in range(384):
|
|
byte_idx = i % len(hash_bytes)
|
|
value = (hash_bytes[byte_idx] - 128) / 128.0
|
|
embedding.append(value)
|
|
embeddings.append(embedding)
|
|
|
|
return embeddings
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
top_k: int = 5,
|
|
filter_metadata: Optional[dict] = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""
|
|
Search for similar chunks.
|
|
|
|
Args:
|
|
query: Query string
|
|
top_k: Number of results to return
|
|
filter_metadata: Optional metadata filters
|
|
|
|
Returns:
|
|
List of matching chunks with scores
|
|
"""
|
|
self._ensure_initialized()
|
|
|
|
if not self._chunks:
|
|
return []
|
|
|
|
# Generate query embedding
|
|
query_embedding = (await self._generate_embeddings([query]))[0]
|
|
|
|
# Calculate similarities
|
|
results = []
|
|
for i, (chunk, embedding, metadata) in enumerate(
|
|
zip(self._chunks, self._embeddings, self._metadata)
|
|
):
|
|
# Apply metadata filter
|
|
if filter_metadata:
|
|
match = all(
|
|
metadata.get(k) == v
|
|
for k, v in filter_metadata.items()
|
|
)
|
|
if not match:
|
|
continue
|
|
|
|
# Calculate cosine similarity
|
|
similarity = self._cosine_similarity(query_embedding, embedding)
|
|
|
|
results.append({
|
|
"id": chunk["id"],
|
|
"content": chunk["content"],
|
|
"metadata": metadata,
|
|
"score": similarity,
|
|
})
|
|
|
|
# Sort by score and return top_k
|
|
results.sort(key=lambda x: x["score"], reverse=True)
|
|
return results[:top_k]
|
|
|
|
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
|
|
"""Calculate cosine similarity between two vectors."""
|
|
if len(a) != len(b):
|
|
return 0.0
|
|
|
|
dot_product = sum(x * y for x, y in zip(a, b))
|
|
norm_a = sum(x * x for x in a) ** 0.5
|
|
norm_b = sum(x * x for x in b) ** 0.5
|
|
|
|
if norm_a == 0 or norm_b == 0:
|
|
return 0.0
|
|
|
|
return dot_product / (norm_a * norm_b)
|
|
|
|
async def list_documents(self) -> list[dict[str, Any]]:
|
|
"""List all unique documents in the store."""
|
|
self._ensure_initialized()
|
|
|
|
# Group by document_id
|
|
documents = {}
|
|
for metadata in self._metadata:
|
|
doc_id = metadata.get("document_id")
|
|
if doc_id and doc_id not in documents:
|
|
documents[doc_id] = {
|
|
"id": doc_id,
|
|
"source": metadata.get("source", "unknown"),
|
|
"chunk_count": 1,
|
|
}
|
|
elif doc_id:
|
|
documents[doc_id]["chunk_count"] += 1
|
|
|
|
return list(documents.values())
|
|
|
|
async def delete_document(self, document_id: str) -> None:
|
|
"""Delete all chunks for a document."""
|
|
self._ensure_initialized()
|
|
|
|
# Find indices to remove
|
|
indices_to_remove = [
|
|
i
|
|
for i, metadata in enumerate(self._metadata)
|
|
if metadata.get("document_id") == document_id
|
|
]
|
|
|
|
# Remove in reverse order to maintain indices
|
|
for i in sorted(indices_to_remove, reverse=True):
|
|
self._chunks.pop(i)
|
|
self._embeddings.pop(i)
|
|
self._metadata.pop(i)
|
|
self._ids.pop(i)
|
|
|
|
# Save changes
|
|
await self._save()
|
|
|
|
log.info(f"Deleted document {document_id} ({len(indices_to_remove)} chunks)")
|
|
|
|
async def delete_by_source_url(self, source_url: str) -> int:
|
|
"""
|
|
Delete all chunks from a specific source URL.
|
|
|
|
Args:
|
|
source_url: The source URL to delete
|
|
|
|
Returns:
|
|
Number of deleted chunks
|
|
"""
|
|
self._ensure_initialized()
|
|
|
|
# Find indices to remove
|
|
indices_to_remove = [
|
|
i
|
|
for i, metadata in enumerate(self._metadata)
|
|
if metadata.get("source_url") == source_url
|
|
]
|
|
|
|
# Remove in reverse order to maintain indices
|
|
for i in sorted(indices_to_remove, reverse=True):
|
|
self._chunks.pop(i)
|
|
self._embeddings.pop(i)
|
|
self._metadata.pop(i)
|
|
self._ids.pop(i)
|
|
|
|
# Save changes
|
|
await self._save()
|
|
|
|
log.info(f"Deleted {len(indices_to_remove)} chunks from source: {source_url}")
|
|
return len(indices_to_remove)
|
|
|
|
async def get_stats(self) -> dict[str, Any]:
|
|
"""Get statistics about the vector store."""
|
|
self._ensure_initialized()
|
|
|
|
# Count unique sources
|
|
sources = set()
|
|
source_urls = set()
|
|
for metadata in self._metadata:
|
|
if metadata.get("source"):
|
|
sources.add(metadata.get("source"))
|
|
if metadata.get("source_url"):
|
|
source_urls.add(metadata.get("source_url"))
|
|
|
|
return {
|
|
"total_chunks": len(self._chunks),
|
|
"unique_sources": len(sources),
|
|
"unique_urls": len(source_urls),
|
|
"embedding_dimension": len(self._embeddings[0]) if self._embeddings else 0,
|
|
}
|