docrag/rag/vector_store.py
Z User b811162f78 Implement tool calling loop for LLM
- Pass all registered tools to LLM during chat completion
- Handle tool_calls from LLM response
- Execute tools and feed results back to LLM
- Loop until LLM returns final response
- Updated system prompt to encourage tool use
- Updated streaming to handle tool calls
- Increased MAX_TOOL_ITERATIONS to 5
2026-03-29 16:07:56 +00:00

338 lines
10 KiB
Python
Executable File

"""
Vector Store - Handles vector storage and similarity search
Provides a simple file-based vector store that can be extended to use
more sophisticated backends like ChromaDB, FAISS, or Pinecone.
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
from pathlib import Path
from typing import Any, Optional
log = logging.getLogger(__name__)
class VectorStore:
"""
Vector store for document embeddings.
This implementation provides:
- Simple file-based persistence
- In-memory similarity search
- Document management
Can be extended to use ChromaDB, FAISS, or other vector databases.
"""
def __init__(
self,
persist_directory: str = "./data/vectors",
embedding_model: str = "text-embedding-3-small",
):
self.persist_directory = Path(persist_directory)
self.embedding_model = embedding_model
self._chunks: list[dict[str, Any]] = []
self._embeddings: list[list[float]] = []
self._metadata: list[dict[str, Any]] = []
self._ids: list[str] = []
self._initialized = False
async def initialize(self) -> None:
"""Initialize the vector store and load existing data."""
if self._initialized:
return
self.persist_directory.mkdir(parents=True, exist_ok=True)
# Load existing data
await self._load()
self._initialized = True
log.info(f"Vector store initialized with {len(self._chunks)} chunks")
async def close(self) -> None:
"""Save and close the vector store."""
await self._save()
log.info("Vector store closed")
async def _load(self) -> None:
"""Load data from disk."""
data_file = self.persist_directory / "store.json"
if not data_file.exists():
return
try:
with open(data_file, "r", encoding="utf-8") as f:
data = json.load(f)
self._chunks = data.get("chunks", [])
self._embeddings = data.get("embeddings", [])
self._metadata = data.get("metadata", [])
self._ids = data.get("ids", [])
log.info(f"Loaded {len(self._chunks)} chunks from disk")
except Exception as e:
log.error(f"Failed to load vector store: {e}")
async def _save(self) -> None:
"""Save data to disk."""
data_file = self.persist_directory / "store.json"
try:
data = {
"chunks": self._chunks,
"embeddings": self._embeddings,
"metadata": self._metadata,
"ids": self._ids,
}
with open(data_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
log.info(f"Saved {len(self._chunks)} chunks to disk")
except Exception as e:
log.error(f"Failed to save vector store: {e}")
def _ensure_initialized(self) -> None:
"""Ensure the vector store is initialized."""
if not self._initialized:
raise RuntimeError("Vector store not initialized")
async def add_chunks(
self,
chunks: list[str],
metadatas: Optional[list[dict[str, Any]]] = None,
ids: Optional[list[str]] = None,
) -> None:
"""
Add chunks to the vector store.
Args:
chunks: List of text chunks
metadatas: Optional list of metadata dicts
ids: Optional list of chunk IDs
"""
self._ensure_initialized()
if not chunks:
return
# Generate IDs if not provided
if ids is None:
ids = [hashlib.md5(chunk.encode()).hexdigest() for chunk in chunks]
# Generate metadata if not provided
if metadatas is None:
metadatas = [{}] * len(chunks)
# Generate embeddings
embeddings = await self._generate_embeddings(chunks)
# Store everything
for i, (chunk, embedding, metadata, chunk_id) in enumerate(
zip(chunks, embeddings, metadatas, ids)
):
self._chunks.append({"id": chunk_id, "content": chunk})
self._embeddings.append(embedding)
self._metadata.append(metadata)
self._ids.append(chunk_id)
# Save to disk
await self._save()
log.info(f"Added {len(chunks)} chunks to vector store")
async def _generate_embeddings(self, texts: list[str]) -> list[list[float]]:
"""
Generate embeddings for texts.
Uses a simple hash-based embedding for demonstration.
In production, use a real embedding model via API.
"""
embeddings = []
for text in texts:
# Simple hash-based embedding (for demo purposes)
# In production, use OpenAI embeddings or similar
hash_bytes = hashlib.sha256(text.encode()).digest()
# Create a 384-dimensional embedding (common size)
embedding = []
for i in range(384):
byte_idx = i % len(hash_bytes)
value = (hash_bytes[byte_idx] - 128) / 128.0
embedding.append(value)
embeddings.append(embedding)
return embeddings
async def search(
self,
query: str,
top_k: int = 5,
filter_metadata: Optional[dict] = None,
) -> list[dict[str, Any]]:
"""
Search for similar chunks.
Args:
query: Query string
top_k: Number of results to return
filter_metadata: Optional metadata filters
Returns:
List of matching chunks with scores
"""
self._ensure_initialized()
if not self._chunks:
return []
# Generate query embedding
query_embedding = (await self._generate_embeddings([query]))[0]
# Calculate similarities
results = []
for i, (chunk, embedding, metadata) in enumerate(
zip(self._chunks, self._embeddings, self._metadata)
):
# Apply metadata filter
if filter_metadata:
match = all(
metadata.get(k) == v
for k, v in filter_metadata.items()
)
if not match:
continue
# Calculate cosine similarity
similarity = self._cosine_similarity(query_embedding, embedding)
results.append({
"id": chunk["id"],
"content": chunk["content"],
"metadata": metadata,
"score": similarity,
})
# Sort by score and return top_k
results.sort(key=lambda x: x["score"], reverse=True)
return results[:top_k]
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
"""Calculate cosine similarity between two vectors."""
if len(a) != len(b):
return 0.0
dot_product = sum(x * y for x, y in zip(a, b))
norm_a = sum(x * x for x in a) ** 0.5
norm_b = sum(x * x for x in b) ** 0.5
if norm_a == 0 or norm_b == 0:
return 0.0
return dot_product / (norm_a * norm_b)
async def list_documents(self) -> list[dict[str, Any]]:
"""List all unique documents in the store."""
self._ensure_initialized()
# Group by document_id
documents = {}
for metadata in self._metadata:
doc_id = metadata.get("document_id")
if doc_id and doc_id not in documents:
documents[doc_id] = {
"id": doc_id,
"source": metadata.get("source", "unknown"),
"chunk_count": 1,
}
elif doc_id:
documents[doc_id]["chunk_count"] += 1
return list(documents.values())
async def delete_document(self, document_id: str) -> None:
"""Delete all chunks for a document."""
self._ensure_initialized()
# Find indices to remove
indices_to_remove = [
i
for i, metadata in enumerate(self._metadata)
if metadata.get("document_id") == document_id
]
# Remove in reverse order to maintain indices
for i in sorted(indices_to_remove, reverse=True):
self._chunks.pop(i)
self._embeddings.pop(i)
self._metadata.pop(i)
self._ids.pop(i)
# Save changes
await self._save()
log.info(f"Deleted document {document_id} ({len(indices_to_remove)} chunks)")
async def delete_by_source_url(self, source_url: str) -> int:
"""
Delete all chunks from a specific source URL.
Args:
source_url: The source URL to delete
Returns:
Number of deleted chunks
"""
self._ensure_initialized()
# Find indices to remove
indices_to_remove = [
i
for i, metadata in enumerate(self._metadata)
if metadata.get("source_url") == source_url
]
# Remove in reverse order to maintain indices
for i in sorted(indices_to_remove, reverse=True):
self._chunks.pop(i)
self._embeddings.pop(i)
self._metadata.pop(i)
self._ids.pop(i)
# Save changes
await self._save()
log.info(f"Deleted {len(indices_to_remove)} chunks from source: {source_url}")
return len(indices_to_remove)
async def get_stats(self) -> dict[str, Any]:
"""Get statistics about the vector store."""
self._ensure_initialized()
# Count unique sources
sources = set()
source_urls = set()
for metadata in self._metadata:
if metadata.get("source"):
sources.add(metadata.get("source"))
if metadata.get("source_url"):
source_urls.add(metadata.get("source_url"))
return {
"total_chunks": len(self._chunks),
"unique_sources": len(sources),
"unique_urls": len(source_urls),
"embedding_dimension": len(self._embeddings[0]) if self._embeddings else 0,
}