- main.py: Rewrote _parse_tool_call with brace-counting for robust JSON extraction - main.py: Improved _clean_tool_syntax with brace-aware removal of tool_call JSON - main.py: Fixed dict key mismatches (chunks_ingested, pages_downloaded) - main.py: Run tool execution in asyncio.to_thread to avoid blocking event loop - main.py: Always clean tool syntax from responses (handles edge cases) - rag/__init__.py: Wrap blocking website_downloader in run_in_executor - rag/__init__.py: Replace deprecated datetime.utcnow() with datetime.now(timezone.utc) - rag/__init__.py: Add add_document_from_url method - rag/vector_store.py: Replace hash-based embeddings with TF-IDF inspired embeddings - rag/vector_store.py: Add embedding dimension mismatch handling in search - README.md: Update API key config documentation
418 lines
13 KiB
Python
Executable File
418 lines
13 KiB
Python
Executable File
"""
|
|
Vector Store - Handles vector storage and similarity search
|
|
|
|
Provides a simple file-based vector store that can be extended to use
|
|
more sophisticated backends like ChromaDB, FAISS, or Pinecone.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import math
|
|
import os
|
|
import re
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Default embedding dimension
|
|
_EMBEDDING_DIM = 256
|
|
|
|
# Simple tokenization pattern
|
|
_WORD_RE = re.compile(r'[a-zA-Z0-9]+' )
|
|
|
|
|
|
class VectorStore:
|
|
"""
|
|
Vector store for document embeddings.
|
|
|
|
This implementation provides:
|
|
- Simple file-based persistence
|
|
- In-memory similarity search
|
|
- Document management
|
|
|
|
Can be extended to use ChromaDB, FAISS, or other vector databases.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
persist_directory: str = "./data/vectors",
|
|
embedding_model: str = "text-embedding-3-small",
|
|
):
|
|
self.persist_directory = Path(persist_directory)
|
|
self.embedding_model = embedding_model
|
|
|
|
self._chunks: list[dict[str, Any]] = []
|
|
self._embeddings: list[list[float]] = []
|
|
self._metadata: list[dict[str, Any]] = []
|
|
self._ids: list[str] = []
|
|
|
|
self._initialized = False
|
|
|
|
async def initialize(self) -> None:
|
|
"""Initialize the vector store and load existing data."""
|
|
if self._initialized:
|
|
return
|
|
|
|
self.persist_directory.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Load existing data
|
|
await self._load()
|
|
|
|
self._initialized = True
|
|
log.info(f"Vector store initialized with {len(self._chunks)} chunks")
|
|
|
|
async def close(self) -> None:
|
|
"""Save and close the vector store."""
|
|
await self._save()
|
|
log.info("Vector store closed")
|
|
|
|
async def _load(self) -> None:
|
|
"""Load data from disk."""
|
|
data_file = self.persist_directory / "store.json"
|
|
|
|
if not data_file.exists():
|
|
return
|
|
|
|
try:
|
|
with open(data_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
self._chunks = data.get("chunks", [])
|
|
self._embeddings = data.get("embeddings", [])
|
|
self._metadata = data.get("metadata", [])
|
|
self._ids = data.get("ids", [])
|
|
|
|
log.info(f"Loaded {len(self._chunks)} chunks from disk")
|
|
|
|
except Exception as e:
|
|
log.error(f"Failed to load vector store: {e}")
|
|
|
|
async def _save(self) -> None:
|
|
"""Save data to disk."""
|
|
data_file = self.persist_directory / "store.json"
|
|
|
|
try:
|
|
data = {
|
|
"chunks": self._chunks,
|
|
"embeddings": self._embeddings,
|
|
"metadata": self._metadata,
|
|
"ids": self._ids,
|
|
}
|
|
|
|
with open(data_file, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
log.info(f"Saved {len(self._chunks)} chunks to disk")
|
|
|
|
except Exception as e:
|
|
log.error(f"Failed to save vector store: {e}")
|
|
|
|
def _ensure_initialized(self) -> None:
|
|
"""Ensure the vector store is initialized."""
|
|
if not self._initialized:
|
|
raise RuntimeError("Vector store not initialized")
|
|
|
|
async def add_chunks(
|
|
self,
|
|
chunks: list[str],
|
|
metadatas: Optional[list[dict[str, Any]]] = None,
|
|
ids: Optional[list[str]] = None,
|
|
) -> None:
|
|
"""
|
|
Add chunks to the vector store.
|
|
|
|
Args:
|
|
chunks: List of text chunks
|
|
metadatas: Optional list of metadata dicts
|
|
ids: Optional list of chunk IDs
|
|
"""
|
|
self._ensure_initialized()
|
|
|
|
if not chunks:
|
|
return
|
|
|
|
# Generate IDs if not provided
|
|
if ids is None:
|
|
ids = [hashlib.md5(chunk.encode()).hexdigest() for chunk in chunks]
|
|
|
|
# Generate metadata if not provided
|
|
if metadatas is None:
|
|
metadatas = [{}] * len(chunks)
|
|
|
|
# Generate embeddings
|
|
embeddings = await self._generate_embeddings(chunks)
|
|
|
|
# Store everything
|
|
for i, (chunk, embedding, metadata, chunk_id) in enumerate(
|
|
zip(chunks, embeddings, metadatas, ids)
|
|
):
|
|
self._chunks.append({"id": chunk_id, "content": chunk})
|
|
self._embeddings.append(embedding)
|
|
self._metadata.append(metadata)
|
|
self._ids.append(chunk_id)
|
|
|
|
# Save to disk
|
|
await self._save()
|
|
|
|
log.info(f"Added {len(chunks)} chunks to vector store")
|
|
|
|
def _tokenize(self, text: str) -> list[str]:
|
|
"""Simple word tokenization."""
|
|
return [w.lower() for w in _WORD_RE.findall(text) if len(w) > 1]
|
|
|
|
def _build_vocab(self, all_tokenized: list[list[str]], max_vocab: int = 10000) -> dict[str, int]:
|
|
"""Build vocabulary from tokenized texts with IDF weighting."""
|
|
doc_freq = Counter()
|
|
for tokens in all_tokenized:
|
|
unique_tokens = set(tokens)
|
|
for t in unique_tokens:
|
|
doc_freq[t] += 1
|
|
# Take top tokens by document frequency (most useful for search)
|
|
vocab = {}
|
|
for idx, (token, _) in enumerate(doc_freq.most_common(max_vocab)):
|
|
vocab[token] = idx
|
|
return vocab
|
|
|
|
async def _generate_embeddings(self, texts: list[str]) -> list[list[float]]:
|
|
"""
|
|
Generate TF-IDF inspired embeddings for texts.
|
|
|
|
Uses a bag-of-words approach with TF-IDF weighting projected into a
|
|
fixed-dimension space. This produces meaningful cosine similarities
|
|
between semantically related texts, unlike hash-based embeddings.
|
|
|
|
In production, replace with a real embedding model API call.
|
|
"""
|
|
if not texts:
|
|
return []
|
|
|
|
# Tokenize all texts
|
|
all_tokenized = [self._tokenize(t) for t in texts]
|
|
|
|
# Build vocabulary from these texts + existing corpus
|
|
# Include existing chunks for consistent vocab
|
|
existing_texts = [c["content"] for c in self._chunks]
|
|
existing_tokenized = [self._tokenize(t) for t in existing_texts]
|
|
combined_tokenized = existing_tokenized + all_tokenized
|
|
|
|
vocab = self._build_vocab(combined_tokenized)
|
|
vocab_size = len(vocab)
|
|
|
|
if vocab_size == 0:
|
|
# Fallback: return zero vectors
|
|
return [[0.0] * _EMBEDDING_DIM for _ in texts]
|
|
|
|
# Compute IDF from all texts
|
|
n_docs = len(combined_tokenized)
|
|
idf = {}
|
|
for token, idx in vocab.items():
|
|
df = sum(1 for tokens in combined_tokenized if token in set(tokens))
|
|
idf[token] = math.log((n_docs + 1) / (df + 1)) + 1
|
|
|
|
# Dimension: project vocab into fixed dimension using hash-based assignment
|
|
dim = min(_EMBEDDING_DIM, vocab_size)
|
|
|
|
embeddings = []
|
|
for tokens in all_tokenized:
|
|
if not tokens:
|
|
embeddings.append([0.0] * dim)
|
|
continue
|
|
|
|
# Compute TF
|
|
tf = Counter(tokens)
|
|
max_tf = max(tf.values())
|
|
|
|
# Build sparse TF-IDF vector projected to fixed dimension
|
|
vec = [0.0] * dim
|
|
for token, count in tf.items():
|
|
if token not in vocab:
|
|
continue
|
|
normalized_tf = 0.5 + 0.5 * (count / max_tf) if max_tf > 0 else 0
|
|
tfidf = normalized_tf * idf.get(token, 1.0)
|
|
# Hash token to a dimension index
|
|
bucket = vocab[token] % dim
|
|
vec[bucket] += tfidf
|
|
|
|
# L2 normalize
|
|
norm = math.sqrt(sum(v * v for v in vec))
|
|
if norm > 0:
|
|
vec = [v / norm for v in vec]
|
|
|
|
embeddings.append(vec)
|
|
|
|
return embeddings
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
top_k: int = 5,
|
|
filter_metadata: Optional[dict] = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""
|
|
Search for similar chunks.
|
|
|
|
Args:
|
|
query: Query string
|
|
top_k: Number of results to return
|
|
filter_metadata: Optional metadata filters
|
|
|
|
Returns:
|
|
List of matching chunks with scores
|
|
"""
|
|
self._ensure_initialized()
|
|
|
|
if not self._chunks:
|
|
return []
|
|
|
|
# Generate query embedding (use full corpus for consistent vocab)
|
|
query_embedding = (await self._generate_embeddings([query]))[0]
|
|
|
|
# Ensure dimensions match
|
|
if self._embeddings and len(query_embedding) != len(self._embeddings[0]):
|
|
log.warning(f"Embedding dimension mismatch: query={len(query_embedding)}, stored={len(self._embeddings[0])}. Using zero-padded query.")
|
|
if len(query_embedding) < len(self._embeddings[0]):
|
|
query_embedding = query_embedding + [0.0] * (len(self._embeddings[0]) - len(query_embedding))
|
|
else:
|
|
query_embedding = query_embedding[:len(self._embeddings[0])]
|
|
|
|
# Calculate similarities
|
|
results = []
|
|
for i, (chunk, embedding, metadata) in enumerate(
|
|
zip(self._chunks, self._embeddings, self._metadata)
|
|
):
|
|
# Apply metadata filter
|
|
if filter_metadata:
|
|
match = all(
|
|
metadata.get(k) == v
|
|
for k, v in filter_metadata.items()
|
|
)
|
|
if not match:
|
|
continue
|
|
|
|
# Calculate cosine similarity
|
|
similarity = self._cosine_similarity(query_embedding, embedding)
|
|
|
|
results.append({
|
|
"id": chunk["id"],
|
|
"content": chunk["content"],
|
|
"metadata": metadata,
|
|
"score": similarity,
|
|
})
|
|
|
|
# Sort by score and return top_k
|
|
results.sort(key=lambda x: x["score"], reverse=True)
|
|
return results[:top_k]
|
|
|
|
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
|
|
"""Calculate cosine similarity between two vectors."""
|
|
if len(a) != len(b):
|
|
return 0.0
|
|
|
|
dot_product = sum(x * y for x, y in zip(a, b))
|
|
norm_a = sum(x * x for x in a) ** 0.5
|
|
norm_b = sum(x * x for x in b) ** 0.5
|
|
|
|
if norm_a == 0 or norm_b == 0:
|
|
return 0.0
|
|
|
|
return dot_product / (norm_a * norm_b)
|
|
|
|
async def list_documents(self) -> list[dict[str, Any]]:
|
|
"""List all unique documents in the store."""
|
|
self._ensure_initialized()
|
|
|
|
# Group by document_id
|
|
documents = {}
|
|
for metadata in self._metadata:
|
|
doc_id = metadata.get("document_id")
|
|
if doc_id and doc_id not in documents:
|
|
documents[doc_id] = {
|
|
"id": doc_id,
|
|
"source": metadata.get("source", "unknown"),
|
|
"chunk_count": 1,
|
|
}
|
|
elif doc_id:
|
|
documents[doc_id]["chunk_count"] += 1
|
|
|
|
return list(documents.values())
|
|
|
|
async def delete_document(self, document_id: str) -> None:
|
|
"""Delete all chunks for a document."""
|
|
self._ensure_initialized()
|
|
|
|
# Find indices to remove
|
|
indices_to_remove = [
|
|
i
|
|
for i, metadata in enumerate(self._metadata)
|
|
if metadata.get("document_id") == document_id
|
|
]
|
|
|
|
# Remove in reverse order to maintain indices
|
|
for i in sorted(indices_to_remove, reverse=True):
|
|
self._chunks.pop(i)
|
|
self._embeddings.pop(i)
|
|
self._metadata.pop(i)
|
|
self._ids.pop(i)
|
|
|
|
# Save changes
|
|
await self._save()
|
|
|
|
log.info(f"Deleted document {document_id} ({len(indices_to_remove)} chunks)")
|
|
|
|
async def delete_by_source_url(self, source_url: str) -> int:
|
|
"""
|
|
Delete all chunks from a specific source URL.
|
|
|
|
Args:
|
|
source_url: The source URL to delete
|
|
|
|
Returns:
|
|
Number of deleted chunks
|
|
"""
|
|
self._ensure_initialized()
|
|
|
|
# Find indices to remove
|
|
indices_to_remove = [
|
|
i
|
|
for i, metadata in enumerate(self._metadata)
|
|
if metadata.get("source_url") == source_url
|
|
]
|
|
|
|
# Remove in reverse order to maintain indices
|
|
for i in sorted(indices_to_remove, reverse=True):
|
|
self._chunks.pop(i)
|
|
self._embeddings.pop(i)
|
|
self._metadata.pop(i)
|
|
self._ids.pop(i)
|
|
|
|
# Save changes
|
|
await self._save()
|
|
|
|
log.info(f"Deleted {len(indices_to_remove)} chunks from source: {source_url}")
|
|
return len(indices_to_remove)
|
|
|
|
async def get_stats(self) -> dict[str, Any]:
|
|
"""Get statistics about the vector store."""
|
|
self._ensure_initialized()
|
|
|
|
# Count unique sources
|
|
sources = set()
|
|
source_urls = set()
|
|
for metadata in self._metadata:
|
|
if metadata.get("source"):
|
|
sources.add(metadata.get("source"))
|
|
if metadata.get("source_url"):
|
|
source_urls.add(metadata.get("source_url"))
|
|
|
|
return {
|
|
"total_chunks": len(self._chunks),
|
|
"unique_sources": len(sources),
|
|
"unique_urls": len(source_urls),
|
|
"embedding_dimension": len(self._embeddings[0]) if self._embeddings else 0,
|
|
}
|