""" Vector Store - Handles vector storage and similarity search Provides a simple file-based vector store that can be extended to use more sophisticated backends like ChromaDB, FAISS, or Pinecone. """ from __future__ import annotations import hashlib import json import logging import os from pathlib import Path from typing import Any, Optional log = logging.getLogger(__name__) class VectorStore: """ Vector store for document embeddings. This implementation provides: - Simple file-based persistence - In-memory similarity search - Document management Can be extended to use ChromaDB, FAISS, or other vector databases. """ def __init__( self, persist_directory: str = "./data/vectors", embedding_model: str = "text-embedding-3-small", ): self.persist_directory = Path(persist_directory) self.embedding_model = embedding_model self._chunks: list[dict[str, Any]] = [] self._embeddings: list[list[float]] = [] self._metadata: list[dict[str, Any]] = [] self._ids: list[str] = [] self._initialized = False async def initialize(self) -> None: """Initialize the vector store and load existing data.""" if self._initialized: return self.persist_directory.mkdir(parents=True, exist_ok=True) # Load existing data await self._load() self._initialized = True log.info(f"Vector store initialized with {len(self._chunks)} chunks") async def close(self) -> None: """Save and close the vector store.""" await self._save() log.info("Vector store closed") async def _load(self) -> None: """Load data from disk.""" data_file = self.persist_directory / "store.json" if not data_file.exists(): return try: with open(data_file, "r", encoding="utf-8") as f: data = json.load(f) self._chunks = data.get("chunks", []) self._embeddings = data.get("embeddings", []) self._metadata = data.get("metadata", []) self._ids = data.get("ids", []) log.info(f"Loaded {len(self._chunks)} chunks from disk") except Exception as e: log.error(f"Failed to load vector store: {e}") async def _save(self) -> None: """Save data to disk.""" data_file = self.persist_directory / "store.json" try: data = { "chunks": self._chunks, "embeddings": self._embeddings, "metadata": self._metadata, "ids": self._ids, } with open(data_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) log.info(f"Saved {len(self._chunks)} chunks to disk") except Exception as e: log.error(f"Failed to save vector store: {e}") def _ensure_initialized(self) -> None: """Ensure the vector store is initialized.""" if not self._initialized: raise RuntimeError("Vector store not initialized") async def add_chunks( self, chunks: list[str], metadatas: Optional[list[dict[str, Any]]] = None, ids: Optional[list[str]] = None, ) -> None: """ Add chunks to the vector store. Args: chunks: List of text chunks metadatas: Optional list of metadata dicts ids: Optional list of chunk IDs """ self._ensure_initialized() if not chunks: return # Generate IDs if not provided if ids is None: ids = [hashlib.md5(chunk.encode()).hexdigest() for chunk in chunks] # Generate metadata if not provided if metadatas is None: metadatas = [{}] * len(chunks) # Generate embeddings embeddings = await self._generate_embeddings(chunks) # Store everything for i, (chunk, embedding, metadata, chunk_id) in enumerate( zip(chunks, embeddings, metadatas, ids) ): self._chunks.append({"id": chunk_id, "content": chunk}) self._embeddings.append(embedding) self._metadata.append(metadata) self._ids.append(chunk_id) # Save to disk await self._save() log.info(f"Added {len(chunks)} chunks to vector store") async def _generate_embeddings(self, texts: list[str]) -> list[list[float]]: """ Generate embeddings for texts. Uses a simple hash-based embedding for demonstration. In production, use a real embedding model via API. """ embeddings = [] for text in texts: # Simple hash-based embedding (for demo purposes) # In production, use OpenAI embeddings or similar hash_bytes = hashlib.sha256(text.encode()).digest() # Create a 384-dimensional embedding (common size) embedding = [] for i in range(384): byte_idx = i % len(hash_bytes) value = (hash_bytes[byte_idx] - 128) / 128.0 embedding.append(value) embeddings.append(embedding) return embeddings async def search( self, query: str, top_k: int = 5, filter_metadata: Optional[dict] = None, ) -> list[dict[str, Any]]: """ Search for similar chunks. Args: query: Query string top_k: Number of results to return filter_metadata: Optional metadata filters Returns: List of matching chunks with scores """ self._ensure_initialized() if not self._chunks: return [] # Generate query embedding query_embedding = (await self._generate_embeddings([query]))[0] # Calculate similarities results = [] for i, (chunk, embedding, metadata) in enumerate( zip(self._chunks, self._embeddings, self._metadata) ): # Apply metadata filter if filter_metadata: match = all( metadata.get(k) == v for k, v in filter_metadata.items() ) if not match: continue # Calculate cosine similarity similarity = self._cosine_similarity(query_embedding, embedding) results.append({ "id": chunk["id"], "content": chunk["content"], "metadata": metadata, "score": similarity, }) # Sort by score and return top_k results.sort(key=lambda x: x["score"], reverse=True) return results[:top_k] def _cosine_similarity(self, a: list[float], b: list[float]) -> float: """Calculate cosine similarity between two vectors.""" if len(a) != len(b): return 0.0 dot_product = sum(x * y for x, y in zip(a, b)) norm_a = sum(x * x for x in a) ** 0.5 norm_b = sum(x * x for x in b) ** 0.5 if norm_a == 0 or norm_b == 0: return 0.0 return dot_product / (norm_a * norm_b) async def list_documents(self) -> list[dict[str, Any]]: """List all unique documents in the store.""" self._ensure_initialized() # Group by document_id documents = {} for metadata in self._metadata: doc_id = metadata.get("document_id") if doc_id and doc_id not in documents: documents[doc_id] = { "id": doc_id, "source": metadata.get("source", "unknown"), "chunk_count": 1, } elif doc_id: documents[doc_id]["chunk_count"] += 1 return list(documents.values()) async def delete_document(self, document_id: str) -> None: """Delete all chunks for a document.""" self._ensure_initialized() # Find indices to remove indices_to_remove = [ i for i, metadata in enumerate(self._metadata) if metadata.get("document_id") == document_id ] # Remove in reverse order to maintain indices for i in sorted(indices_to_remove, reverse=True): self._chunks.pop(i) self._embeddings.pop(i) self._metadata.pop(i) self._ids.pop(i) # Save changes await self._save() log.info(f"Deleted document {document_id} ({len(indices_to_remove)} chunks)") async def delete_by_source_url(self, source_url: str) -> int: """ Delete all chunks from a specific source URL. Args: source_url: The source URL to delete Returns: Number of deleted chunks """ self._ensure_initialized() # Find indices to remove indices_to_remove = [ i for i, metadata in enumerate(self._metadata) if metadata.get("source_url") == source_url ] # Remove in reverse order to maintain indices for i in sorted(indices_to_remove, reverse=True): self._chunks.pop(i) self._embeddings.pop(i) self._metadata.pop(i) self._ids.pop(i) # Save changes await self._save() log.info(f"Deleted {len(indices_to_remove)} chunks from source: {source_url}") return len(indices_to_remove) async def get_stats(self) -> dict[str, Any]: """Get statistics about the vector store.""" self._ensure_initialized() # Count unique sources sources = set() source_urls = set() for metadata in self._metadata: if metadata.get("source"): sources.add(metadata.get("source")) if metadata.get("source_url"): source_urls.add(metadata.get("source_url")) return { "total_chunks": len(self._chunks), "unique_sources": len(sources), "unique_urls": len(source_urls), "embedding_dimension": len(self._embeddings[0]) if self._embeddings else 0, }