projects/pyrag3/retrieval_service.py
2026-04-05 17:30:07 -07:00

333 lines
14 KiB
Python

import sqlite3
import os
import logging
import requests
import struct
import base64
import mimetypes
import re
from pathlib import Path
from pypdf import PdfReader
from bs4 import BeautifulSoup
import markdownify as md
from pyrag3.gitea_api import GiteaAPI
from pyrag3.vector_service import VectorService
logger = logging.getLogger(__name__)
class RetrievalService:
def __init__(self, db_path="rag_data/index.db"):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.api = GiteaAPI()
self.vector_service = VectorService()
self._init_db()
def _init_db(self):
"""Initialize SQLite database for metadata and vectors with chunk-level support."""
if not os.path.exists(os.path.dirname(self.db_path)):
os.makedirs(os.path.dirname(self.db_path))
with sqlite3.connect(self.db_path) as conn:
# We migrate the schema for chunk support (using ID as primary key)
# Check if columns exist; if not, recreate or migrate
try:
cursor = conn.execute("SELECT content FROM documents LIMIT 1")
except sqlite3.OperationalError:
logger.info("Migrating database for chunk and content support...")
conn.execute("DROP TABLE IF EXISTS documents")
conn.execute("""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT,
sha TEXT,
path TEXT,
title TEXT,
content TEXT,
embedding BLOB
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_url ON documents(url)")
conn.commit()
def _serialize_vector(self, vector):
"""Convert a list of floats to a binary BLOB."""
if not vector: return None
return struct.pack(f"{len(vector)}f", *vector)
def _deserialize_vector(self, blob):
"""Convert a binary BLOB back to a list of floats."""
if not blob: return None
count = len(blob) // 4
return list(struct.unpack(f"{count}f", blob))
def _chunk_text(self, text, chunk_size=1500, overlap=200):
"""Split text into overlapping chunks recursively for higher-precision search."""
if not text: return []
chunks = []
current_chunk = ""
# Split by paragraphs
paragraphs = text.split("\n\n")
for para in paragraphs:
if len(current_chunk) + len(para) <= chunk_size:
current_chunk += para + "\n\n"
else:
if current_chunk:
chunks.append(current_chunk.strip())
# Handle oversized paragraphs by sentences
if len(para) > chunk_size:
sentences = para.split(". ")
sub_chunk = ""
for sent in sentences:
if len(sub_chunk) + len(sent) <= chunk_size:
sub_chunk += sent + ". "
else:
if sub_chunk: chunks.append(sub_chunk.strip())
sub_chunk = sent + ". "
current_chunk = sub_chunk
else:
current_chunk = para + "\n\n"
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def _parse_content(self, raw_content, filepath):
"""Extract rich markdown text and significant images for multimodal embedding."""
ext = os.path.splitext(filepath)[1].lower()
mime_type, _ = mimetypes.guess_type(filepath)
content = None
title = os.path.basename(filepath)
image_b64 = None
try:
if mime_type and mime_type.startswith("image/"):
image_b64 = base64.b64encode(raw_content).decode("utf-8")
elif ext == ".pdf":
temp_pdf = Path("temp_index.pdf")
temp_pdf.write_bytes(raw_content)
reader = PdfReader(temp_pdf)
content = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()])
temp_pdf.unlink()
elif ext in [".html", ".htm"]:
soup = BeautifulSoup(raw_content, "html.parser")
title = soup.title.string if soup.title else title
# 1. Clean HTML for Text Extraction
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
# 2. Extract first significant image for multimodal context
for img in soup.find_all("img"):
src = img.get("src")
if src:
if any(x in src.lower() for x in ["icon", "logo", "pixel", "tracker", "sprite"]):
continue
if src.startswith("data:image"):
image_b64 = src.split(",")[1]
break
# 3. Convert to clean Markdown
content = md.markdownify(str(soup), heading_style="ATX")
else:
content = raw_content.decode("utf-8", errors="ignore")
except Exception as e:
logger.error(f"Error parsing {filepath}: {e}")
if content:
content = re.sub(r' +', ' ', content).strip()
return content, title, image_b64, mime_type
def ingest_document(self, local_path, remote_path, download_url, sha=None):
"""Parse, CHUNK, vector, and index a document into granular pieces."""
local_path = Path(local_path)
try:
raw_bytes = local_path.read_bytes()
content, title, image_b64, mime = self._parse_content(raw_bytes, str(local_path))
chunks = self._chunk_text(content) if content else []
if not chunks and image_b64:
chunks = ["Visual Content"]
indexed_count = 0
with sqlite3.connect(self.db_path) as conn:
for i, chunk_text in enumerate(chunks):
# We can combine chunk text with the image for multimodal context if it's the first chunk
current_img = image_b64 if i == 0 else None
embedding = self.vector_service.get_embedding(text=chunk_text, image_b64=current_img, mime_type=mime)
if embedding:
blob = self._serialize_vector(embedding)
conn.execute("""
INSERT INTO documents (url, sha, path, title, content, embedding)
VALUES (?, ?, ?, ?, ?, ?)
""", (download_url, sha, remote_path, title, chunk_text, blob))
indexed_count += 1
conn.commit()
if indexed_count > 0:
logger.debug(f"Granularly indexed {local_path.name} into {indexed_count} chunks.")
return True
return False
except Exception as e:
logger.error(f"Failed to ingest {local_path.name}: {e}")
return False
def ingest_document_from_bytes(self, raw_bytes, remote_path, download_url, sha=None):
"""Helper for sync_and_reindex to ingest without a local file."""
try:
content, title, image_b64, mime = self._parse_content(raw_bytes, remote_path)
chunks = self._chunk_text(content) if content else []
if not chunks and image_b64: chunks = ["Visual Content"]
indexed_count = 0
with sqlite3.connect(self.db_path) as conn:
for i, chunk_text in enumerate(chunks):
current_img = image_b64 if i == 0 else None
embedding = self.vector_service.get_embedding(text=chunk_text, image_b64=current_img, mime_type=mime)
if embedding:
blob = self._serialize_vector(embedding)
conn.execute("""
INSERT INTO documents (url, sha, path, title, content, embedding)
VALUES (?, ?, ?, ?, ?, ?)
""", (download_url, sha, remote_path, title, chunk_text, blob))
indexed_count += 1
conn.commit()
return indexed_count > 0
except Exception as e:
logger.error(f"Failed to sync-ingest {remote_path}: {e}")
return False
def add_local_file(self, local_path):
"""Upload a local file and then ingest it into the index."""
path = Path(local_path)
if not path.exists():
logger.error(f"File not found: {local_path}")
return False
logger.info(f"Adding local file: {path.name}...")
# 1. Upload to Gitea
remote_path = f"local_uploads/{path.name}"
download_url = self.api.upload_file(str(path), remote_path, message=f"Local Upload: {path.name}")
if not download_url:
return False
# 2. Get SHA
files = self.api.list_files("local_uploads/")
sha = next((f["sha"] for f in files if f["path"] == remote_path), None)
# 3. Ingest Locally
success = self.ingest_document(path, remote_path, download_url, sha)
# 4. Push Updated Index
if success:
self.push_index_db()
logger.info(f"Successfully added and indexed {path.name}")
return success
def sync_and_reindex(self):
"""Sync with remote Gitea repo and update semantic index with chunking."""
logger.info("Starting Semantic Sync...")
remote_files = self.api.list_files()
with sqlite3.connect(self.db_path) as conn:
# We use DISTINCT because a URL now has multiple chunks
cursor = conn.execute("SELECT DISTINCT url, sha FROM documents")
indexed = {row[0]: row[1] for row in cursor.fetchall()}
remote_urls = set()
for f in remote_files:
if f["path"] == "index.db": continue
url = f["download_url"]
remote_urls.add(url)
sha = f["sha"]
if url not in indexed or indexed[url] != sha:
logger.info(f"Vecting new document: {f['path']}...")
try:
resp = requests.get(url, timeout=15)
resp.raise_for_status()
self.ingest_document_from_bytes(resp.content, f["path"], url, sha)
except Exception as e:
logger.error(f"Failed to index {url}: {e}")
# Cleanup deleted files
to_remove = set(indexed.keys()) - remote_urls
if to_remove:
logger.info(f"Removing {len(to_remove)} deleted files from local index.")
for url in to_remove:
conn.execute("DELETE FROM documents WHERE url = ?", (url,))
conn.commit()
logger.info("Semantic Indexing complete.")
def pull_index_db(self, remote_path="index.db"):
"""Download remote shared index from Gitea."""
logger.info(f"Pulling shared index from {remote_path}...")
self.db_path.parent.mkdir(parents=True, exist_ok=True)
return self.api.download_file(remote_path, str(self.db_path), is_binary=True)
def push_index_db(self, remote_path="index.db"):
"""Upload local index to Gitea for other users."""
logger.info(f"Pushing shared index to {remote_path}...")
return self.api.upload_file(str(self.db_path), remote_path, message="Updated Shared Index")
def reset_all(self):
"""DANGEROUS: Purge all documents and index from both repo and local system."""
logger.warning("INHERENTLY DESTRUCTIVE: Purging knowledge base...")
remote_files = self.api.list_files()
for f in remote_files:
logger.info(f"Deleting remote asset: {f['path']}...")
self.api.delete_file(f["path"], f["sha"], message="Reset: System Purge")
if self.db_path.exists():
try:
import gc
gc.collect()
self.db_path.unlink()
logger.info("Local index deleted.")
except Exception as e:
logger.error(f"Failed to delete local DB file: {e}")
self._init_db()
return True
def search(self, query, limit=10):
"""Search the semantic index for the best matching chunks."""
query_vector = self.vector_service.get_embedding(query)
if not query_vector: return []
results = []
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("SELECT url, title, content, embedding FROM documents")
for url, title, content, blob in cursor.fetchall():
doc_vector = self._deserialize_vector(blob)
similarity = self.vector_service.cosine_similarity(query_vector, doc_vector)
results.append({
"url": url,
"title": title,
"content": content,
"score": similarity
})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
service = RetrievalService()
# Test
res = service.search("Chico history")
for r in res:
print(f"[{r['score']:.4f}] {r['title']} - {r['content'][:100]}...")