333 lines
14 KiB
Python
333 lines
14 KiB
Python
import sqlite3
|
|
import os
|
|
import logging
|
|
import requests
|
|
import struct
|
|
import base64
|
|
import mimetypes
|
|
import re
|
|
from pathlib import Path
|
|
from pypdf import PdfReader
|
|
from bs4 import BeautifulSoup
|
|
import markdownify as md
|
|
from pyrag3.gitea_api import GiteaAPI
|
|
from pyrag3.vector_service import VectorService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class RetrievalService:
|
|
def __init__(self, db_path="rag_data/index.db"):
|
|
self.db_path = Path(db_path)
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self.api = GiteaAPI()
|
|
self.vector_service = VectorService()
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
"""Initialize SQLite database for metadata and vectors with chunk-level support."""
|
|
if not os.path.exists(os.path.dirname(self.db_path)):
|
|
os.makedirs(os.path.dirname(self.db_path))
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
# We migrate the schema for chunk support (using ID as primary key)
|
|
# Check if columns exist; if not, recreate or migrate
|
|
try:
|
|
cursor = conn.execute("SELECT content FROM documents LIMIT 1")
|
|
except sqlite3.OperationalError:
|
|
logger.info("Migrating database for chunk and content support...")
|
|
conn.execute("DROP TABLE IF EXISTS documents")
|
|
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS documents (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
url TEXT,
|
|
sha TEXT,
|
|
path TEXT,
|
|
title TEXT,
|
|
content TEXT,
|
|
embedding BLOB
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_url ON documents(url)")
|
|
conn.commit()
|
|
|
|
def _serialize_vector(self, vector):
|
|
"""Convert a list of floats to a binary BLOB."""
|
|
if not vector: return None
|
|
return struct.pack(f"{len(vector)}f", *vector)
|
|
|
|
def _deserialize_vector(self, blob):
|
|
"""Convert a binary BLOB back to a list of floats."""
|
|
if not blob: return None
|
|
count = len(blob) // 4
|
|
return list(struct.unpack(f"{count}f", blob))
|
|
|
|
def _chunk_text(self, text, chunk_size=1500, overlap=200):
|
|
"""Split text into overlapping chunks recursively for higher-precision search."""
|
|
if not text: return []
|
|
|
|
chunks = []
|
|
current_chunk = ""
|
|
|
|
# Split by paragraphs
|
|
paragraphs = text.split("\n\n")
|
|
for para in paragraphs:
|
|
if len(current_chunk) + len(para) <= chunk_size:
|
|
current_chunk += para + "\n\n"
|
|
else:
|
|
if current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
|
|
# Handle oversized paragraphs by sentences
|
|
if len(para) > chunk_size:
|
|
sentences = para.split(". ")
|
|
sub_chunk = ""
|
|
for sent in sentences:
|
|
if len(sub_chunk) + len(sent) <= chunk_size:
|
|
sub_chunk += sent + ". "
|
|
else:
|
|
if sub_chunk: chunks.append(sub_chunk.strip())
|
|
sub_chunk = sent + ". "
|
|
current_chunk = sub_chunk
|
|
else:
|
|
current_chunk = para + "\n\n"
|
|
|
|
if current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
|
|
return chunks
|
|
|
|
def _parse_content(self, raw_content, filepath):
|
|
"""Extract rich markdown text and significant images for multimodal embedding."""
|
|
ext = os.path.splitext(filepath)[1].lower()
|
|
mime_type, _ = mimetypes.guess_type(filepath)
|
|
|
|
content = None
|
|
title = os.path.basename(filepath)
|
|
image_b64 = None
|
|
|
|
try:
|
|
if mime_type and mime_type.startswith("image/"):
|
|
image_b64 = base64.b64encode(raw_content).decode("utf-8")
|
|
elif ext == ".pdf":
|
|
temp_pdf = Path("temp_index.pdf")
|
|
temp_pdf.write_bytes(raw_content)
|
|
reader = PdfReader(temp_pdf)
|
|
content = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()])
|
|
temp_pdf.unlink()
|
|
elif ext in [".html", ".htm"]:
|
|
soup = BeautifulSoup(raw_content, "html.parser")
|
|
title = soup.title.string if soup.title else title
|
|
|
|
# 1. Clean HTML for Text Extraction
|
|
for script in soup(["script", "style", "nav", "footer", "header"]):
|
|
script.decompose()
|
|
|
|
# 2. Extract first significant image for multimodal context
|
|
for img in soup.find_all("img"):
|
|
src = img.get("src")
|
|
if src:
|
|
if any(x in src.lower() for x in ["icon", "logo", "pixel", "tracker", "sprite"]):
|
|
continue
|
|
if src.startswith("data:image"):
|
|
image_b64 = src.split(",")[1]
|
|
break
|
|
|
|
# 3. Convert to clean Markdown
|
|
content = md.markdownify(str(soup), heading_style="ATX")
|
|
else:
|
|
content = raw_content.decode("utf-8", errors="ignore")
|
|
except Exception as e:
|
|
logger.error(f"Error parsing {filepath}: {e}")
|
|
|
|
if content:
|
|
content = re.sub(r' +', ' ', content).strip()
|
|
|
|
return content, title, image_b64, mime_type
|
|
|
|
def ingest_document(self, local_path, remote_path, download_url, sha=None):
|
|
"""Parse, CHUNK, vector, and index a document into granular pieces."""
|
|
local_path = Path(local_path)
|
|
try:
|
|
raw_bytes = local_path.read_bytes()
|
|
content, title, image_b64, mime = self._parse_content(raw_bytes, str(local_path))
|
|
|
|
chunks = self._chunk_text(content) if content else []
|
|
if not chunks and image_b64:
|
|
chunks = ["Visual Content"]
|
|
|
|
indexed_count = 0
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
for i, chunk_text in enumerate(chunks):
|
|
# We can combine chunk text with the image for multimodal context if it's the first chunk
|
|
current_img = image_b64 if i == 0 else None
|
|
|
|
embedding = self.vector_service.get_embedding(text=chunk_text, image_b64=current_img, mime_type=mime)
|
|
if embedding:
|
|
blob = self._serialize_vector(embedding)
|
|
conn.execute("""
|
|
INSERT INTO documents (url, sha, path, title, content, embedding)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""", (download_url, sha, remote_path, title, chunk_text, blob))
|
|
indexed_count += 1
|
|
conn.commit()
|
|
|
|
if indexed_count > 0:
|
|
logger.debug(f"Granularly indexed {local_path.name} into {indexed_count} chunks.")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to ingest {local_path.name}: {e}")
|
|
return False
|
|
|
|
def ingest_document_from_bytes(self, raw_bytes, remote_path, download_url, sha=None):
|
|
"""Helper for sync_and_reindex to ingest without a local file."""
|
|
try:
|
|
content, title, image_b64, mime = self._parse_content(raw_bytes, remote_path)
|
|
chunks = self._chunk_text(content) if content else []
|
|
if not chunks and image_b64: chunks = ["Visual Content"]
|
|
|
|
indexed_count = 0
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
for i, chunk_text in enumerate(chunks):
|
|
current_img = image_b64 if i == 0 else None
|
|
embedding = self.vector_service.get_embedding(text=chunk_text, image_b64=current_img, mime_type=mime)
|
|
if embedding:
|
|
blob = self._serialize_vector(embedding)
|
|
conn.execute("""
|
|
INSERT INTO documents (url, sha, path, title, content, embedding)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""", (download_url, sha, remote_path, title, chunk_text, blob))
|
|
indexed_count += 1
|
|
conn.commit()
|
|
return indexed_count > 0
|
|
except Exception as e:
|
|
logger.error(f"Failed to sync-ingest {remote_path}: {e}")
|
|
return False
|
|
|
|
def add_local_file(self, local_path):
|
|
"""Upload a local file and then ingest it into the index."""
|
|
path = Path(local_path)
|
|
if not path.exists():
|
|
logger.error(f"File not found: {local_path}")
|
|
return False
|
|
|
|
logger.info(f"Adding local file: {path.name}...")
|
|
|
|
# 1. Upload to Gitea
|
|
remote_path = f"local_uploads/{path.name}"
|
|
download_url = self.api.upload_file(str(path), remote_path, message=f"Local Upload: {path.name}")
|
|
if not download_url:
|
|
return False
|
|
|
|
# 2. Get SHA
|
|
files = self.api.list_files("local_uploads/")
|
|
sha = next((f["sha"] for f in files if f["path"] == remote_path), None)
|
|
|
|
# 3. Ingest Locally
|
|
success = self.ingest_document(path, remote_path, download_url, sha)
|
|
|
|
# 4. Push Updated Index
|
|
if success:
|
|
self.push_index_db()
|
|
logger.info(f"Successfully added and indexed {path.name}")
|
|
|
|
return success
|
|
|
|
def sync_and_reindex(self):
|
|
"""Sync with remote Gitea repo and update semantic index with chunking."""
|
|
logger.info("Starting Semantic Sync...")
|
|
|
|
remote_files = self.api.list_files()
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
# We use DISTINCT because a URL now has multiple chunks
|
|
cursor = conn.execute("SELECT DISTINCT url, sha FROM documents")
|
|
indexed = {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
remote_urls = set()
|
|
for f in remote_files:
|
|
if f["path"] == "index.db": continue
|
|
url = f["download_url"]
|
|
remote_urls.add(url)
|
|
sha = f["sha"]
|
|
|
|
if url not in indexed or indexed[url] != sha:
|
|
logger.info(f"Vecting new document: {f['path']}...")
|
|
try:
|
|
resp = requests.get(url, timeout=15)
|
|
resp.raise_for_status()
|
|
self.ingest_document_from_bytes(resp.content, f["path"], url, sha)
|
|
except Exception as e:
|
|
logger.error(f"Failed to index {url}: {e}")
|
|
|
|
# Cleanup deleted files
|
|
to_remove = set(indexed.keys()) - remote_urls
|
|
if to_remove:
|
|
logger.info(f"Removing {len(to_remove)} deleted files from local index.")
|
|
for url in to_remove:
|
|
conn.execute("DELETE FROM documents WHERE url = ?", (url,))
|
|
conn.commit()
|
|
logger.info("Semantic Indexing complete.")
|
|
|
|
def pull_index_db(self, remote_path="index.db"):
|
|
"""Download remote shared index from Gitea."""
|
|
logger.info(f"Pulling shared index from {remote_path}...")
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
return self.api.download_file(remote_path, str(self.db_path), is_binary=True)
|
|
|
|
def push_index_db(self, remote_path="index.db"):
|
|
"""Upload local index to Gitea for other users."""
|
|
logger.info(f"Pushing shared index to {remote_path}...")
|
|
return self.api.upload_file(str(self.db_path), remote_path, message="Updated Shared Index")
|
|
|
|
def reset_all(self):
|
|
"""DANGEROUS: Purge all documents and index from both repo and local system."""
|
|
logger.warning("INHERENTLY DESTRUCTIVE: Purging knowledge base...")
|
|
|
|
remote_files = self.api.list_files()
|
|
for f in remote_files:
|
|
logger.info(f"Deleting remote asset: {f['path']}...")
|
|
self.api.delete_file(f["path"], f["sha"], message="Reset: System Purge")
|
|
|
|
if self.db_path.exists():
|
|
try:
|
|
import gc
|
|
gc.collect()
|
|
self.db_path.unlink()
|
|
logger.info("Local index deleted.")
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete local DB file: {e}")
|
|
self._init_db()
|
|
return True
|
|
|
|
def search(self, query, limit=10):
|
|
"""Search the semantic index for the best matching chunks."""
|
|
query_vector = self.vector_service.get_embedding(query)
|
|
if not query_vector: return []
|
|
|
|
results = []
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cursor = conn.execute("SELECT url, title, content, embedding FROM documents")
|
|
for url, title, content, blob in cursor.fetchall():
|
|
doc_vector = self._deserialize_vector(blob)
|
|
similarity = self.vector_service.cosine_similarity(query_vector, doc_vector)
|
|
|
|
results.append({
|
|
"url": url,
|
|
"title": title,
|
|
"content": content,
|
|
"score": similarity
|
|
})
|
|
|
|
results.sort(key=lambda x: x["score"], reverse=True)
|
|
return results[:limit]
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO)
|
|
service = RetrievalService()
|
|
# Test
|
|
res = service.search("Chico history")
|
|
for r in res:
|
|
print(f"[{r['score']:.4f}] {r['title']} - {r['content'][:100]}...")
|