""" RAG System - Retrieval Augmented Generation This module provides the core RAG functionality for DocRAG, including: - Document processing and chunking - Vector storage and similarity search - Context retrieval for enhanced prompts """ from __future__ import annotations import asyncio import logging import os from pathlib import Path from typing import Any, Optional from .document_processor import DocumentProcessor from .vector_store import VectorStore from .retriever import Retriever log = logging.getLogger(__name__) class RAGSystem: """ Main RAG system that coordinates document processing, storage, and retrieval. This class provides a unified interface for: - Adding documents to the knowledge base - Querying for relevant context - Managing the document lifecycle """ def __init__( self, embedding_model: str = "text-embedding-3-small", vector_store_path: str = "./data/vectors", documents_path: str = "./data/documents", chunk_size: int = 1000, chunk_overlap: int = 200, ): self.embedding_model = embedding_model self.vector_store_path = Path(vector_store_path) self.documents_path = Path(documents_path) self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self._initialized = False self._document_processor: Optional[DocumentProcessor] = None self._vector_store: Optional[VectorStore] = None self._retriever: Optional[Retriever] = None async def initialize(self) -> None: """Initialize the RAG system components.""" if self._initialized: return log.info("Initializing RAG system...") # Create directories self.vector_store_path.mkdir(parents=True, exist_ok=True) self.documents_path.mkdir(parents=True, exist_ok=True) # Initialize document processor self._document_processor = DocumentProcessor( chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, ) # Initialize vector store self._vector_store = VectorStore( persist_directory=str(self.vector_store_path), embedding_model=self.embedding_model, ) # Initialize retriever self._retriever = Retriever( vector_store=self._vector_store, ) self._initialized = True log.info("RAG system initialized successfully") async def close(self) -> None: """Close the RAG system and release resources.""" if self._vector_store: await self._vector_store.close() self._initialized = False log.info("RAG system closed") def _ensure_initialized(self) -> None: """Ensure the RAG system is initialized.""" if not self._initialized: raise RuntimeError("RAG system not initialized. Call initialize() first.") async def add_document( self, content: bytes, filename: str, metadata: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: """ Add a document to the knowledge base. Args: content: Raw document content filename: Original filename metadata: Optional metadata Returns: Dictionary with processing results """ self._ensure_initialized() # Process document doc_info = await self._document_processor.process( content=content, filename=filename, metadata=metadata, ) # Store chunks in vector store if doc_info.get("chunks"): await self._vector_store.add_chunks( chunks=doc_info["chunks"], metadatas=doc_info.get("metadatas", []), ids=doc_info.get("ids", []), ) log.info(f"Added document '{filename}' with {len(doc_info.get('chunks', []))} chunks") return {"chunks": len(doc_info.get("chunks", [])), "document_id": doc_info.get("document_id")} async def add_document_from_url(self, url: str) -> dict[str, Any]: """ Add a document from a URL to the knowledge base. Args: url: URL to fetch and process Returns: Dictionary with processing results """ self._ensure_initialized() # Fetch content from URL import aiohttp async with aiohttp.ClientSession() as session: async with session.get(url, timeout=30) as response: response.raise_for_status() content = await response.read() # Extract filename from URL from urllib.parse import urlparse parsed = urlparse(url) filename = os.path.basename(parsed.path) or "webpage.html" return await self.add_document(content=content, filename=filename, metadata={"source_url": url}) async def query( self, query: str, top_k: int = 5, filter_metadata: Optional[dict] = None, ) -> dict[str, Any]: """ Query the knowledge base for relevant context. Args: query: Query string top_k: Number of results to return filter_metadata: Optional metadata filters Returns: Dictionary with context and sources """ self._ensure_initialized() # Retrieve relevant chunks results = await self._retriever.retrieve( query=query, top_k=top_k, filter_metadata=filter_metadata, ) # Build context string context_parts = [] sources = [] for i, result in enumerate(results): context_parts.append(f"[{i+1}] {result['content']}") if result.get("metadata", {}).get("source"): sources.append(result["metadata"]["source"]) context = "\n\n".join(context_parts) return { "context": context, "sources": list(set(sources)), "num_results": len(results), "results": results, } async def list_documents(self) -> list[dict[str, Any]]: """List all documents in the knowledge base.""" self._ensure_initialized() return await self._vector_store.list_documents() async def delete_document(self, document_id: str) -> None: """Delete a document from the knowledge base.""" self._ensure_initialized() await self._vector_store.delete_document(document_id) log.info(f"Deleted document {document_id}") # Global RAG system instance _rag_system: Optional[RAGSystem] = None async def get_rag_system( embedding_model: str = "text-embedding-3-small", vector_store_path: str = "./data/vectors", documents_path: str = "./data/documents", chunk_size: int = 1000, chunk_overlap: int = 200, ) -> RAGSystem: """ Get or create the global RAG system instance. Args: embedding_model: Name of the embedding model vector_store_path: Path to vector store documents_path: Path to document storage chunk_size: Size of document chunks chunk_overlap: Overlap between chunks Returns: Initialized RAGSystem instance """ global _rag_system if _rag_system is None: _rag_system = RAGSystem( embedding_model=embedding_model, vector_store_path=vector_store_path, documents_path=documents_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) await _rag_system.initialize() return _rag_system