import sqlite3 import os import logging import requests import struct import base64 import mimetypes import re from pathlib import Path from pypdf import PdfReader from bs4 import BeautifulSoup import markdownify as md from pyrag3.gitea_api import GiteaAPI from pyrag3.vector_service import VectorService logger = logging.getLogger(__name__) class RetrievalService: def __init__(self, db_path="rag_data/index.db"): self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self.api = GiteaAPI() self.vector_service = VectorService() self._init_db() def _init_db(self): """Initialize SQLite database for metadata and vectors with chunk-level support.""" if not os.path.exists(os.path.dirname(self.db_path)): os.makedirs(os.path.dirname(self.db_path)) with sqlite3.connect(self.db_path) as conn: # We migrate the schema for chunk support (using ID as primary key) # Check if columns exist; if not, recreate or migrate try: cursor = conn.execute("SELECT content FROM documents LIMIT 1") except sqlite3.OperationalError: logger.info("Migrating database for chunk and content support...") conn.execute("DROP TABLE IF EXISTS documents") conn.execute(""" CREATE TABLE IF NOT EXISTS documents ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT, sha TEXT, path TEXT, title TEXT, content TEXT, embedding BLOB ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_url ON documents(url)") conn.commit() def _serialize_vector(self, vector): """Convert a list of floats to a binary BLOB.""" if not vector: return None return struct.pack(f"{len(vector)}f", *vector) def _deserialize_vector(self, blob): """Convert a binary BLOB back to a list of floats.""" if not blob: return None count = len(blob) // 4 return list(struct.unpack(f"{count}f", blob)) def _chunk_text(self, text, chunk_size=1500, overlap=200): """Split text into overlapping chunks recursively for higher-precision search.""" if not text: return [] chunks = [] current_chunk = "" # Split by paragraphs paragraphs = text.split("\n\n") for para in paragraphs: if len(current_chunk) + len(para) <= chunk_size: current_chunk += para + "\n\n" else: if current_chunk: chunks.append(current_chunk.strip()) # Handle oversized paragraphs by sentences if len(para) > chunk_size: sentences = para.split(". ") sub_chunk = "" for sent in sentences: if len(sub_chunk) + len(sent) <= chunk_size: sub_chunk += sent + ". " else: if sub_chunk: chunks.append(sub_chunk.strip()) sub_chunk = sent + ". " current_chunk = sub_chunk else: current_chunk = para + "\n\n" if current_chunk: chunks.append(current_chunk.strip()) return chunks def _parse_content(self, raw_content, filepath): """Extract rich markdown text and significant images for multimodal embedding.""" ext = os.path.splitext(filepath)[1].lower() mime_type, _ = mimetypes.guess_type(filepath) content = None title = os.path.basename(filepath) image_b64 = None try: if mime_type and mime_type.startswith("image/"): image_b64 = base64.b64encode(raw_content).decode("utf-8") elif ext == ".pdf": temp_pdf = Path("temp_index.pdf") temp_pdf.write_bytes(raw_content) reader = PdfReader(temp_pdf) content = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()]) temp_pdf.unlink() elif ext in [".html", ".htm"]: soup = BeautifulSoup(raw_content, "html.parser") title = soup.title.string if soup.title else title # 1. Clean HTML for Text Extraction for script in soup(["script", "style", "nav", "footer", "header"]): script.decompose() # 2. Extract first significant image for multimodal context for img in soup.find_all("img"): src = img.get("src") if src: if any(x in src.lower() for x in ["icon", "logo", "pixel", "tracker", "sprite"]): continue if src.startswith("data:image"): image_b64 = src.split(",")[1] break # 3. Convert to clean Markdown content = md.markdownify(str(soup), heading_style="ATX") else: content = raw_content.decode("utf-8", errors="ignore") except Exception as e: logger.error(f"Error parsing {filepath}: {e}") if content: content = re.sub(r' +', ' ', content).strip() return content, title, image_b64, mime_type def ingest_document(self, local_path, remote_path, download_url, sha=None): """Parse, CHUNK, vector, and index a document into granular pieces.""" local_path = Path(local_path) try: raw_bytes = local_path.read_bytes() content, title, image_b64, mime = self._parse_content(raw_bytes, str(local_path)) chunks = self._chunk_text(content) if content else [] if not chunks and image_b64: chunks = ["Visual Content"] indexed_count = 0 with sqlite3.connect(self.db_path) as conn: for i, chunk_text in enumerate(chunks): # We can combine chunk text with the image for multimodal context if it's the first chunk current_img = image_b64 if i == 0 else None embedding = self.vector_service.get_embedding(text=chunk_text, image_b64=current_img, mime_type=mime) if embedding: blob = self._serialize_vector(embedding) conn.execute(""" INSERT INTO documents (url, sha, path, title, content, embedding) VALUES (?, ?, ?, ?, ?, ?) """, (download_url, sha, remote_path, title, chunk_text, blob)) indexed_count += 1 conn.commit() if indexed_count > 0: logger.debug(f"Granularly indexed {local_path.name} into {indexed_count} chunks.") return True return False except Exception as e: logger.error(f"Failed to ingest {local_path.name}: {e}") return False def ingest_document_from_bytes(self, raw_bytes, remote_path, download_url, sha=None): """Helper for sync_and_reindex to ingest without a local file.""" try: content, title, image_b64, mime = self._parse_content(raw_bytes, remote_path) chunks = self._chunk_text(content) if content else [] if not chunks and image_b64: chunks = ["Visual Content"] indexed_count = 0 with sqlite3.connect(self.db_path) as conn: for i, chunk_text in enumerate(chunks): current_img = image_b64 if i == 0 else None embedding = self.vector_service.get_embedding(text=chunk_text, image_b64=current_img, mime_type=mime) if embedding: blob = self._serialize_vector(embedding) conn.execute(""" INSERT INTO documents (url, sha, path, title, content, embedding) VALUES (?, ?, ?, ?, ?, ?) """, (download_url, sha, remote_path, title, chunk_text, blob)) indexed_count += 1 conn.commit() return indexed_count > 0 except Exception as e: logger.error(f"Failed to sync-ingest {remote_path}: {e}") return False def add_local_file(self, local_path): """Upload a local file and then ingest it into the index.""" path = Path(local_path) if not path.exists(): logger.error(f"File not found: {local_path}") return False logger.info(f"Adding local file: {path.name}...") # 1. Upload to Gitea remote_path = f"local_uploads/{path.name}" download_url = self.api.upload_file(str(path), remote_path, message=f"Local Upload: {path.name}") if not download_url: return False # 2. Get SHA files = self.api.list_files("local_uploads/") sha = next((f["sha"] for f in files if f["path"] == remote_path), None) # 3. Ingest Locally success = self.ingest_document(path, remote_path, download_url, sha) # 4. Push Updated Index if success: self.push_index_db() logger.info(f"Successfully added and indexed {path.name}") return success def sync_and_reindex(self): """Sync with remote Gitea repo and update semantic index with chunking.""" logger.info("Starting Semantic Sync...") remote_files = self.api.list_files() with sqlite3.connect(self.db_path) as conn: # We use DISTINCT because a URL now has multiple chunks cursor = conn.execute("SELECT DISTINCT url, sha FROM documents") indexed = {row[0]: row[1] for row in cursor.fetchall()} remote_urls = set() for f in remote_files: if f["path"] == "index.db": continue url = f["download_url"] remote_urls.add(url) sha = f["sha"] if url not in indexed or indexed[url] != sha: logger.info(f"Vecting new document: {f['path']}...") try: resp = requests.get(url, timeout=15) resp.raise_for_status() self.ingest_document_from_bytes(resp.content, f["path"], url, sha) except Exception as e: logger.error(f"Failed to index {url}: {e}") # Cleanup deleted files to_remove = set(indexed.keys()) - remote_urls if to_remove: logger.info(f"Removing {len(to_remove)} deleted files from local index.") for url in to_remove: conn.execute("DELETE FROM documents WHERE url = ?", (url,)) conn.commit() logger.info("Semantic Indexing complete.") def pull_index_db(self, remote_path="index.db"): """Download remote shared index from Gitea.""" logger.info(f"Pulling shared index from {remote_path}...") self.db_path.parent.mkdir(parents=True, exist_ok=True) return self.api.download_file(remote_path, str(self.db_path), is_binary=True) def push_index_db(self, remote_path="index.db"): """Upload local index to Gitea for other users.""" logger.info(f"Pushing shared index to {remote_path}...") return self.api.upload_file(str(self.db_path), remote_path, message="Updated Shared Index") def reset_all(self): """DANGEROUS: Purge all documents and index from both repo and local system.""" logger.warning("INHERENTLY DESTRUCTIVE: Purging knowledge base...") remote_files = self.api.list_files() for f in remote_files: logger.info(f"Deleting remote asset: {f['path']}...") self.api.delete_file(f["path"], f["sha"], message="Reset: System Purge") if self.db_path.exists(): try: import gc gc.collect() self.db_path.unlink() logger.info("Local index deleted.") except Exception as e: logger.error(f"Failed to delete local DB file: {e}") self._init_db() return True def search(self, query, limit=10): """Search the semantic index for the best matching chunks.""" query_vector = self.vector_service.get_embedding(query) if not query_vector: return [] results = [] with sqlite3.connect(self.db_path) as conn: cursor = conn.execute("SELECT url, title, content, embedding FROM documents") for url, title, content, blob in cursor.fetchall(): doc_vector = self._deserialize_vector(blob) similarity = self.vector_service.cosine_similarity(query_vector, doc_vector) results.append({ "url": url, "title": title, "content": content, "score": similarity }) results.sort(key=lambda x: x["score"], reverse=True) return results[:limit] if __name__ == "__main__": logging.basicConfig(level=logging.INFO) service = RetrievalService() # Test res = service.search("Chico history") for r in res: print(f"[{r['score']:.4f}] {r['title']} - {r['content'][:100]}...")