This commit is contained in:
turtle89431 2026-04-06 16:06:22 -07:00
parent 01b72bc3c1
commit 28bb0ad1e2
15 changed files with 0 additions and 1248 deletions

162
pyrag3/.gitignore vendored
View File

@ -1,162 +0,0 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

View File

@ -1,33 +0,0 @@
import subprocess
import sys
import os
def build():
"""Compile main.py into a standalone binary using Nuitka."""
print("Starting Nuitka build process...")
# Define flags
# --onefile: Create a single executable
# --standalone: Include all dependencies
# --follow-imports: Follow all imports
flags = [
sys.executable, "-m", "nuitka",
"--onefile",
"--standalone",
"--follow-imports",
"--output-filename=doc_retrieve.exe" if sys.platform == "win32" else "--output-filename=doc_retrieve",
"main.py"
]
print(f"Running command: {' '.join(flags)}")
try:
subprocess.run(flags, check=True)
print("Build successful!")
except subprocess.CalledProcessError as e:
print(f"Build failed with exit code {e.returncode}")
# Not exiting here to allow me to see the output in command_status
raise
if __name__ == "__main__":
build()

View File

@ -1,111 +0,0 @@
import os
import logging
import queue
import threading
import time
import re
from pathlib import Path
from urllib.parse import urlparse, urljoin
import requests
from bs4 import BeautifulSoup
# Reuse some constants and logic from the original downloader
ASSET_EXTENSIONS = (
".css", ".js", ".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico",
".woff", ".woff2", ".ttf", ".eot"
)
logger = logging.getLogger(__name__)
class Crawler:
def __init__(self, root_dir):
self.root_dir = Path(root_dir)
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
})
def download_page(self, url, max_assets=20):
"""Download a single page and its essential assets."""
try:
resp = self.session.get(url, timeout=10)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
except Exception as e:
logger.error(f"Failed to fetch {url}: {e}")
return None
def _sanitize_filename(self, name):
"""Remove invalid characters for Windows/Linux filenames."""
# Replace non-alphanumeric with underscores
res = re.sub(r'[^a-zA-Z0-9_\-\.]', '_', name)
# Limit length
return res[:100]
def download_page(self, url, max_assets=20):
"""Download a single page and its essential assets."""
try:
resp = self.session.get(url, timeout=10)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
except Exception as e:
logger.error(f"Failed to fetch {url}: {e}")
return None
# Determine local path
parsed = urlparse(url)
host_dir = self.root_dir / self._sanitize_filename(parsed.netloc)
host_dir.mkdir(parents=True, exist_ok=True)
# Sanitize the filename from the path
raw_name = parsed.path.strip("/").replace("/", "_") or "index"
page_name = self._sanitize_filename(raw_name)
if not page_name.lower().endswith(".html"):
page_name += ".html"
local_path = host_dir / page_name
# Simple asset downloading (minimal version of website_downloader.py)
assets_dir = host_dir / "assets"
assets_dir.mkdir(exist_ok=True)
asset_count = 0
for tag in soup.find_all(["img", "link", "script"]):
if asset_count >= max_assets:
break
attr = "src" if tag.name in ["img", "script"] else "href"
link = tag.get(attr)
if not link or link.startswith("data:"):
continue
asset_url = urljoin(url, link)
asset_parsed = urlparse(asset_url)
if any(asset_parsed.path.lower().endswith(ext) for ext in ASSET_EXTENSIONS):
asset_name = os.path.basename(asset_parsed.path)
if not asset_name: continue
asset_local_path = assets_dir / asset_name
try:
asset_resp = self.session.get(asset_url, timeout=5)
asset_resp.raise_for_status()
with open(asset_local_path, "wb") as f:
f.write(asset_resp.content)
# Rewrite link in soup
tag[attr] = f"assets/{asset_name}"
asset_count += 1
except Exception as e:
logger.debug(f"Failed to download asset {asset_url}: {e}")
with open(local_path, "w", encoding="utf-8") as f:
f.write(soup.prettify())
return local_path
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
c = Crawler("rag_data/repo/web")
path = c.download_page("https://example.com")
print(f"Downloaded to: {path}")

View File

@ -1,138 +0,0 @@
import requests
import base64
import os
import logging
from urllib.parse import quote
logger = logging.getLogger(__name__)
class GiteaAPI:
def __init__(self, owner="butterfly", repo="ragdocs"):
self.base_url = "https://git.client.guacamolebox.net/api/v1"
self.owner = owner
self.repo = repo
# Hardcoded for portability in standalone binary
self.token = "f8e1300d871e905e27ce54c3455a3343104d9b04"
self.headers = {"Authorization": f"token {self.token}", "Content-Type": "application/json"}
def delete_file(self, filepath, sha, branch="main", message="Automated Cleanup"):
"""Delete a file from the repository via the Gitea API."""
url = f"{self.base_url}/repos/{self.owner}/{self.repo}/contents/{quote(filepath, safe='/')}"
data = {
"branch": branch,
"sha": sha,
"message": message
}
try:
resp = requests.delete(url, headers=self.headers, json=data, timeout=15)
if resp.status_code == 200:
logger.info(f"Deleted remote file: {filepath}")
return True
else:
logger.error(f"Failed to delete {filepath}: {resp.text}")
return False
except Exception as e:
logger.error(f"Error deleting {filepath}: {e}")
return False
def list_files(self, path="", recursive=True):
"""Recursively list all files in the repository."""
url = f"{self.base_url}/repos/{self.owner}/{self.repo}/contents/{quote(path, safe='/')}"
try:
resp = requests.get(url, headers=self.headers, timeout=15)
if resp.status_code == 404:
return []
resp.raise_for_status()
items = resp.json()
# Gitea returns a list for directories, but a dict for single files
if isinstance(items, dict):
items = [items]
files = []
for item in items:
if item["type"] == "file":
files.append({
"path": item["path"],
"download_url": item["download_url"],
"size": item["size"],
"sha": item["sha"]
})
elif item["type"] == "dir" and recursive:
files.extend(self.list_files(item["path"], recursive=True))
return files
except Exception as e:
logger.debug(f"Note: Failed to list files at {path}: {e}")
return []
def upload_file(self, local_path, remote_path, branch="main", message="Added new document"):
"""Upload a file to the repository via the Gitea API (POST for Create, PUT for Update)."""
url = f"{self.base_url}/repos/{self.owner}/{self.repo}/contents/{quote(remote_path, safe='/')}"
try:
with open(local_path, "rb") as f:
content = base64.b64encode(f.read()).decode("utf-8")
# 1. Existing Check
sha = None
exists = False
try:
existing_resp = requests.get(url, headers=self.headers, timeout=10)
if existing_resp.status_code == 200:
sha = existing_resp.json().get("sha")
exists = True
elif existing_resp.status_code == 404:
exists = False
except:
exists = False # Assume new if check fails
data = {
"branch": branch,
"content": content,
"message": message
}
if sha:
data["sha"] = sha
# 2. Method selection (POST for Create, PUT for Update)
method = requests.put if exists else requests.post
resp = method(url, headers=self.headers, json=data, timeout=15)
if resp.status_code not in [200, 201]:
logger.error(f"Failed to upload {remote_path}: {resp.text}")
return None
return resp.json()["content"]["download_url"]
except Exception as e:
logger.error(f"Failed to upload {local_path} to {remote_path}: {e}")
return None
def download_file(self, remote_path, local_path, is_binary=True):
"""Download a file from the repository to a local path."""
url = self.get_raw_url(remote_path)
try:
resp = requests.get(url, headers=self.headers, timeout=30)
resp.raise_for_status()
mode = "wb" if is_binary else "w"
content = resp.content if is_binary else resp.text
with open(local_path, mode) as f:
f.write(content)
logger.info(f"Downloaded {remote_path} to {local_path}")
return True
except Exception as e:
logger.error(f"Failed to download {remote_path}: {e}")
return False
def get_raw_url(self, filepath, branch="main"):
"""Generate the raw URL for a given file path."""
return f"https://git.client.guacamolebox.net/{self.owner}/{self.repo}/raw/branch/{branch}/{quote(filepath, safe='/')}"
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
api = GiteaAPI()
files = api.list_files()
print(f"Found {len(files)} files.")
for f in files[:3]:
print(f" - {f['path']} at {f['download_url']}")

View File

@ -1,175 +0,0 @@
import argparse
import json
import os
import logging
import shutil
from pathlib import Path
from pyrag3.repo_manager import RepoManager
from pyrag3.retrieval_service import RetrievalService
from pyrag3.search_manager import SearchManager
from pyrag3.crawler import Crawler
# Setup logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
class DocumentRetrieveApp:
def __init__(self, data_root="rag_data"):
self.data_root = Path(data_root)
self.db_path = self.data_root / "index.db"
# Initialize modules
self.repo = RepoManager()
self.retrieval = RetrievalService(db_path=self.db_path)
self.searcher = SearchManager()
# Crawler still uses a local temp dir for downloading before upload
self.crawler = Crawler(root_dir=self.data_root / "temp_web")
def sync(self):
"""Sync with remote shared index and then verify file metadata."""
logger.info("Synchronizing shared index and remote metadata...")
try:
# 1. Pull the shared 'brain' (index.db) if it exists
self.retrieval.pull_index_db()
# 2. Re-verify the file lists to catch any missing raw documents
self.retrieval.sync_and_reindex()
return True
except Exception as e:
logger.error(f"Sync failed: {e}")
return False
def query(self, text, limit=5, fallback=True, similarity_threshold=0.35, perfect_threshold=0.70):
"""Perform Hybrid chunk-level retrieval (Local Index + Web Discovery)."""
logger.info(f"Hybrid Query (Chunk Level): {text}")
# 1. Initial Local Search for top chunks
results = self.retrieval.search(text, limit=limit)
best_score = results[0].get('score', 0) if results else 0
# 2. Proactive Discovery (If no good chunks found)
if fallback and best_score < perfect_threshold:
logger.info("Triggering search discovery for higher-relevance context...")
search_urls = self.searcher.search(text, num_results=5)
new_files = []
for url in search_urls:
# Deduplicate: check if URL is already represented in local chunks
if any(url == r['url'] for r in results):
continue
logger.info(f"Archiving fresh context: {url}...")
local_path = self.crawler.download_page(url)
if local_path:
new_files.append(local_path)
if new_files:
logger.info(f"Ingesting {len(new_files)} new discoveries...")
uploaded_urls = self.repo.commit_and_push(new_files, message=f"Discovery: {text}")
if uploaded_urls:
for local_p in new_files:
remote_path = f"web_results/{local_p.name}"
d_url = next((u for u in uploaded_urls if local_p.name in u), None)
if d_url:
# Index as granular chunks immediately
self.retrieval.ingest_document(local_p, remote_path, d_url)
# Cleanup and push updated shared index
try: shutil.rmtree(self.crawler.root_dir, ignore_errors=True)
except: pass
self.retrieval.push_index_db()
# Final Search to get the new high-relevance chunks
results = self.retrieval.search(text, limit=limit)
# Final filtering based on chunk similarity score
final_results = [r for r in results if r.get('score', 0) >= similarity_threshold]
if final_results:
logger.info(f"Returning {len(final_results)} unified chunk results.")
return final_results
else:
logger.info("Returning 0 unified chunk results.")
return []
def main():
parser = argparse.ArgumentParser(description="Storage-Efficient Document Retrieval Tool")
subparsers = parser.add_subparsers(dest="command", help="Command to run")
# Query command
query_parser = subparsers.add_parser("query", help="Query the document index")
query_parser.add_argument("text", help="The query text")
query_parser.add_argument("--limit", type=int, default=5, help="Number of results to return")
query_parser.add_argument("--no-fallback", action="store_true", help="Disable web search fallback")
query_parser.add_argument("--format", choices=["json", "text"], default="json", help="Output format")
# Add command
add_parser = subparsers.add_parser("add", help="Add a local file or directory to the repo and index")
add_parser.add_argument("path", help="Path to local file or directory")
# Update command
subparsers.add_parser("update", help="Sync remote inventory and re-index")
# Reset command
subparsers.add_parser("reset", help="DANGEROUS: Purge all data from repo and index")
args = parser.parse_args()
app = DocumentRetrieveApp()
if args.command == "update":
if app.sync():
print("Successfully synced remote metadata and updated index.")
else:
print("Update failed.")
elif args.command == "reset":
confirm = input("⚠️ CAUTION: This will permanently DELETE all files in Gitea and the local index. Are you sure? (y/n): ")
if confirm.lower() == 'y':
if app.retrieval.reset_all():
print("Successfully purged all knowledge base assets.")
else:
print("Reset failed.")
else:
print("Reset cancelled.")
elif args.command == "add":
path = Path(args.path)
if path.is_file():
if app.retrieval.add_local_file(str(path)):
print(f"Successfully added {path.name} to the distributed knowledge base.")
else:
print(f"Failed to add {path.name}.")
elif path.is_dir():
files = list(path.glob("*"))
print(f"Adding {len(files)} files from {path.name}...")
count = 0
for f in files:
if f.is_file():
if app.retrieval.add_local_file(str(f)):
count += 1
print(f"Successfully added {count} files from {path.name}.")
else:
print(f"Path not found: {args.path}")
elif args.command == "query":
results = app.query(args.text, limit=args.limit, fallback=(not args.no_fallback))
if args.format == "json":
# This now includes the full 'content' field for the LLM
print(json.dumps(results, indent=2))
else:
if not results:
print("No results found.")
for r in results:
print(f"[{r['score']:.4f}] {r['title']}")
print(f"URL: {r['url']}")
content_preview = r.get('content', '')[:200].replace('\n', ' ')
print(f"Preview: {content_preview}...")
print("-" * 40)
else:
parser.print_help()
if __name__ == "__main__":
main()

Binary file not shown.

View File

@ -1,52 +0,0 @@
import logging
from pathlib import Path
from pyrag3.gitea_api import GiteaAPI
# Hardcoded repository details as requested
REPO_URL = "https://f8e1300d871e905e27ce54c3455a3343104d9b04@git.client.guacamolebox.net/butterfly/ragdocs.git"
logger = logging.getLogger(__name__)
class RepoManager:
def __init__(self, repo_url=REPO_URL):
self.repo_url = repo_url
self.api = GiteaAPI()
def sync_repo(self):
"""Deprecated: We no longer clone or pull to save storage."""
logger.info("Syncing repository metadata via API (skipping clone/pull)...")
# In the new architecture, sync is handled by the RetrievalService
# listing files via API.
pass
def commit_and_push(self, file_paths, message="Added new documents"):
"""Upload new files directly to the remote repository via API."""
logger.info(f"Uploading {len(file_paths)} files to repository via API...")
uploaded_urls = []
for file_path in file_paths:
file_path = Path(file_path)
# Use relative path from the designated "web_pages" root or similar
# For simplicity, we'll use a standard remote folder
remote_path = f"web_results/{file_path.name}"
url = self.api.upload_file(file_path, remote_path, message=message)
if url:
uploaded_urls.append(url)
logger.debug(f"Uploaded {file_path} to {url}")
else:
logger.error(f"Failed to upload {file_path}")
return uploaded_urls
if __name__ == "__main__":
# Quick test
logging.basicConfig(level=logging.INFO)
manager = RepoManager()
manager.sync_repo()
if __name__ == "__main__":
# Quick test
logging.basicConfig(level=logging.INFO)
manager = RepoManager()
manager.sync_repo()

View File

@ -1,11 +0,0 @@
requests
beautifulsoup4
pypdf
googlesearch-python
zstandard
nuitka
duckduckgo-search
lxml
wikipedia
feedparser
markdownify

View File

@ -1,332 +0,0 @@
import sqlite3
import os
import logging
import requests
import struct
import base64
import mimetypes
import re
from pathlib import Path
from pypdf import PdfReader
from bs4 import BeautifulSoup
import markdownify as md
from pyrag3.gitea_api import GiteaAPI
from pyrag3.vector_service import VectorService
logger = logging.getLogger(__name__)
class RetrievalService:
def __init__(self, db_path="rag_data/index.db"):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.api = GiteaAPI()
self.vector_service = VectorService()
self._init_db()
def _init_db(self):
"""Initialize SQLite database for metadata and vectors with chunk-level support."""
if not os.path.exists(os.path.dirname(self.db_path)):
os.makedirs(os.path.dirname(self.db_path))
with sqlite3.connect(self.db_path) as conn:
# We migrate the schema for chunk support (using ID as primary key)
# Check if columns exist; if not, recreate or migrate
try:
cursor = conn.execute("SELECT content FROM documents LIMIT 1")
except sqlite3.OperationalError:
logger.info("Migrating database for chunk and content support...")
conn.execute("DROP TABLE IF EXISTS documents")
conn.execute("""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT,
sha TEXT,
path TEXT,
title TEXT,
content TEXT,
embedding BLOB
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_url ON documents(url)")
conn.commit()
def _serialize_vector(self, vector):
"""Convert a list of floats to a binary BLOB."""
if not vector: return None
return struct.pack(f"{len(vector)}f", *vector)
def _deserialize_vector(self, blob):
"""Convert a binary BLOB back to a list of floats."""
if not blob: return None
count = len(blob) // 4
return list(struct.unpack(f"{count}f", blob))
def _chunk_text(self, text, chunk_size=1500, overlap=200):
"""Split text into overlapping chunks recursively for higher-precision search."""
if not text: return []
chunks = []
current_chunk = ""
# Split by paragraphs
paragraphs = text.split("\n\n")
for para in paragraphs:
if len(current_chunk) + len(para) <= chunk_size:
current_chunk += para + "\n\n"
else:
if current_chunk:
chunks.append(current_chunk.strip())
# Handle oversized paragraphs by sentences
if len(para) > chunk_size:
sentences = para.split(". ")
sub_chunk = ""
for sent in sentences:
if len(sub_chunk) + len(sent) <= chunk_size:
sub_chunk += sent + ". "
else:
if sub_chunk: chunks.append(sub_chunk.strip())
sub_chunk = sent + ". "
current_chunk = sub_chunk
else:
current_chunk = para + "\n\n"
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def _parse_content(self, raw_content, filepath):
"""Extract rich markdown text and significant images for multimodal embedding."""
ext = os.path.splitext(filepath)[1].lower()
mime_type, _ = mimetypes.guess_type(filepath)
content = None
title = os.path.basename(filepath)
image_b64 = None
try:
if mime_type and mime_type.startswith("image/"):
image_b64 = base64.b64encode(raw_content).decode("utf-8")
elif ext == ".pdf":
temp_pdf = Path("temp_index.pdf")
temp_pdf.write_bytes(raw_content)
reader = PdfReader(temp_pdf)
content = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()])
temp_pdf.unlink()
elif ext in [".html", ".htm"]:
soup = BeautifulSoup(raw_content, "html.parser")
title = soup.title.string if soup.title else title
# 1. Clean HTML for Text Extraction
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
# 2. Extract first significant image for multimodal context
for img in soup.find_all("img"):
src = img.get("src")
if src:
if any(x in src.lower() for x in ["icon", "logo", "pixel", "tracker", "sprite"]):
continue
if src.startswith("data:image"):
image_b64 = src.split(",")[1]
break
# 3. Convert to clean Markdown
content = md.markdownify(str(soup), heading_style="ATX")
else:
content = raw_content.decode("utf-8", errors="ignore")
except Exception as e:
logger.error(f"Error parsing {filepath}: {e}")
if content:
content = re.sub(r' +', ' ', content).strip()
return content, title, image_b64, mime_type
def ingest_document(self, local_path, remote_path, download_url, sha=None):
"""Parse, CHUNK, vector, and index a document into granular pieces."""
local_path = Path(local_path)
try:
raw_bytes = local_path.read_bytes()
content, title, image_b64, mime = self._parse_content(raw_bytes, str(local_path))
chunks = self._chunk_text(content) if content else []
if not chunks and image_b64:
chunks = ["Visual Content"]
indexed_count = 0
with sqlite3.connect(self.db_path) as conn:
for i, chunk_text in enumerate(chunks):
# We can combine chunk text with the image for multimodal context if it's the first chunk
current_img = image_b64 if i == 0 else None
embedding = self.vector_service.get_embedding(text=chunk_text, image_b64=current_img, mime_type=mime)
if embedding:
blob = self._serialize_vector(embedding)
conn.execute("""
INSERT INTO documents (url, sha, path, title, content, embedding)
VALUES (?, ?, ?, ?, ?, ?)
""", (download_url, sha, remote_path, title, chunk_text, blob))
indexed_count += 1
conn.commit()
if indexed_count > 0:
logger.debug(f"Granularly indexed {local_path.name} into {indexed_count} chunks.")
return True
return False
except Exception as e:
logger.error(f"Failed to ingest {local_path.name}: {e}")
return False
def ingest_document_from_bytes(self, raw_bytes, remote_path, download_url, sha=None):
"""Helper for sync_and_reindex to ingest without a local file."""
try:
content, title, image_b64, mime = self._parse_content(raw_bytes, remote_path)
chunks = self._chunk_text(content) if content else []
if not chunks and image_b64: chunks = ["Visual Content"]
indexed_count = 0
with sqlite3.connect(self.db_path) as conn:
for i, chunk_text in enumerate(chunks):
current_img = image_b64 if i == 0 else None
embedding = self.vector_service.get_embedding(text=chunk_text, image_b64=current_img, mime_type=mime)
if embedding:
blob = self._serialize_vector(embedding)
conn.execute("""
INSERT INTO documents (url, sha, path, title, content, embedding)
VALUES (?, ?, ?, ?, ?, ?)
""", (download_url, sha, remote_path, title, chunk_text, blob))
indexed_count += 1
conn.commit()
return indexed_count > 0
except Exception as e:
logger.error(f"Failed to sync-ingest {remote_path}: {e}")
return False
def add_local_file(self, local_path):
"""Upload a local file and then ingest it into the index."""
path = Path(local_path)
if not path.exists():
logger.error(f"File not found: {local_path}")
return False
logger.info(f"Adding local file: {path.name}...")
# 1. Upload to Gitea
remote_path = f"local_uploads/{path.name}"
download_url = self.api.upload_file(str(path), remote_path, message=f"Local Upload: {path.name}")
if not download_url:
return False
# 2. Get SHA
files = self.api.list_files("local_uploads/")
sha = next((f["sha"] for f in files if f["path"] == remote_path), None)
# 3. Ingest Locally
success = self.ingest_document(path, remote_path, download_url, sha)
# 4. Push Updated Index
if success:
self.push_index_db()
logger.info(f"Successfully added and indexed {path.name}")
return success
def sync_and_reindex(self):
"""Sync with remote Gitea repo and update semantic index with chunking."""
logger.info("Starting Semantic Sync...")
remote_files = self.api.list_files()
with sqlite3.connect(self.db_path) as conn:
# We use DISTINCT because a URL now has multiple chunks
cursor = conn.execute("SELECT DISTINCT url, sha FROM documents")
indexed = {row[0]: row[1] for row in cursor.fetchall()}
remote_urls = set()
for f in remote_files:
if f["path"] == "index.db": continue
url = f["download_url"]
remote_urls.add(url)
sha = f["sha"]
if url not in indexed or indexed[url] != sha:
logger.info(f"Vecting new document: {f['path']}...")
try:
resp = requests.get(url, timeout=15)
resp.raise_for_status()
self.ingest_document_from_bytes(resp.content, f["path"], url, sha)
except Exception as e:
logger.error(f"Failed to index {url}: {e}")
# Cleanup deleted files
to_remove = set(indexed.keys()) - remote_urls
if to_remove:
logger.info(f"Removing {len(to_remove)} deleted files from local index.")
for url in to_remove:
conn.execute("DELETE FROM documents WHERE url = ?", (url,))
conn.commit()
logger.info("Semantic Indexing complete.")
def pull_index_db(self, remote_path="index.db"):
"""Download remote shared index from Gitea."""
logger.info(f"Pulling shared index from {remote_path}...")
self.db_path.parent.mkdir(parents=True, exist_ok=True)
return self.api.download_file(remote_path, str(self.db_path), is_binary=True)
def push_index_db(self, remote_path="index.db"):
"""Upload local index to Gitea for other users."""
logger.info(f"Pushing shared index to {remote_path}...")
return self.api.upload_file(str(self.db_path), remote_path, message="Updated Shared Index")
def reset_all(self):
"""DANGEROUS: Purge all documents and index from both repo and local system."""
logger.warning("INHERENTLY DESTRUCTIVE: Purging knowledge base...")
remote_files = self.api.list_files()
for f in remote_files:
logger.info(f"Deleting remote asset: {f['path']}...")
self.api.delete_file(f["path"], f["sha"], message="Reset: System Purge")
if self.db_path.exists():
try:
import gc
gc.collect()
self.db_path.unlink()
logger.info("Local index deleted.")
except Exception as e:
logger.error(f"Failed to delete local DB file: {e}")
self._init_db()
return True
def search(self, query, limit=10):
"""Search the semantic index for the best matching chunks."""
query_vector = self.vector_service.get_embedding(query)
if not query_vector: return []
results = []
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("SELECT url, title, content, embedding FROM documents")
for url, title, content, blob in cursor.fetchall():
doc_vector = self._deserialize_vector(blob)
similarity = self.vector_service.cosine_similarity(query_vector, doc_vector)
results.append({
"url": url,
"title": title,
"content": content,
"score": similarity
})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
service = RetrievalService()
# Test
res = service.search("Chico history")
for r in res:
print(f"[{r['score']:.4f}] {r['title']} - {r['content'][:100]}...")

View File

@ -1,109 +0,0 @@
import logging
import requests
import os
import re
from urllib.parse import unquote, quote
import wikipedia
import feedparser
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
class SearchManager:
def __init__(self):
self.session = requests.Session()
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
}
# Curated list of high-authority, public RSS feeds
self.news_feeds = [
"https://www.reutersagency.com/en/reuters-best/rss/",
"http://feeds.bbci.co.uk/news/rss.xml",
"http://rss.cnn.com/rss/cnn_topstories.rss",
"https://www.aljazeera.com/xml/rss/all.xml",
"https://feeds.npr.org/1001/rss.xml"
]
def search(self, query, num_results=5):
"""Unified parallel search: Web/News (Primary) + Wikipedia (Reference)."""
logger.info(f"Unified Discovery for: {query}")
results = []
# 1. Web Phase (Primary): Use DuckDuckGo Search
# This provides specific, long-tail articles directly without redirects
web_links = self._search_web(query, num_results=num_results)
if web_links:
logger.info(f"Web Discovery found {len(web_links)} specific articles.")
results.extend(web_links)
# 2. Encyclopedic Phase (Secondary): Check Wikipedia for broad context
# Limit to 2 results to avoid 'cluttering' the findings as per user request
wiki_links = self._search_wikipedia(query, num_results=2)
if wiki_links:
logger.info(f"Wikipedia Knowledge found {len(wiki_links)} pages.")
results.extend(wiki_links)
# 3. Curated News feeds for high-authority updates (Optional backup)
temporal_keywords = ["today", "latest", "now", "2026", "news", "headline", "update"]
if any(k in query.lower() for k in temporal_keywords):
news_links = self._search_curated_news(query, num_results=2)
if news_links:
results.extend(news_links)
# Deduplicate while preserving order (Web results now naturally come first)
unique_results = []
seen = set()
for r in results:
if r not in seen:
unique_results.append(r)
seen.add(r)
return unique_results[:num_results]
def _search_wikipedia(self, query, num_results):
"""Use the Wikipedia library for clean knowledge extraction."""
try:
pages = wikipedia.search(query, results=num_results)
urls = []
for page in pages:
try:
p = wikipedia.page(page, auto_suggest=False)
urls.append(p.url)
except: continue
return urls
except:
return []
def _search_curated_news(self, query, num_results):
"""Search curated high-authority RSS feeds."""
links = []
words = query.lower().split()
for feed_url in self.news_feeds:
try:
feed = feedparser.parse(feed_url)
for entry in feed.entries:
if any(word in entry.title.lower() for word in words):
links.append(entry.link)
if len(links) >= num_results: break
except: continue
if len(links) >= num_results: break
return links
def _search_web(self, query, num_results):
"""Search Web for direct links avoiding Javascript redirects."""
try:
from duckduckgo_search import DDGS
links = []
results = DDGS().text(query, max_results=num_results)
for r in results:
links.append(r['href'])
return links
except Exception as e:
logger.error(f"Web Search Error: {e}")
return []
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
sm = SearchManager()
print(f"Parallel Search: {sm.search('Who was John Bidwell', num_results=2)}")

View File

@ -1,7 +0,0 @@
import logging
from pyrag3.search_manager import SearchManager
logging.basicConfig(level=logging.DEBUG)
sm = SearchManager()
res = sm.search("test")
print(f"Results: {res}")

View File

@ -1,7 +0,0 @@
import logging
from pyrag3.search_manager import SearchManager
logging.basicConfig(level=logging.INFO)
sm = SearchManager()
res = sm.search("Starship news 2024")
print(f"Links found: {res}")

View File

@ -1,32 +0,0 @@
[
{
"url": "https://git.client.guacamolebox.net/butterfly/ragdocs/raw/branch/main/web_results/wiki_Lake_Oroville.html",
"title": "\r\n Lake Oroville - Wikipedia\r\n ",
"content": "| Lake Oroville | |\n| --- | --- |\n| Satellite image of the lake by [Sentinel-2](/wiki/Sentinel-2 \"Sentinel-2\") | |\n| [Location of Lake Oroville in California, USA.](/wiki/File:Relief_map_of_California.png \"Location of Lake Oroville in California, USA.\") Location of Lake Oroville in California, USA. Lake Oroville Show map of California [Location of Lake Oroville in California, USA.](/wiki/File:Usa_edcp_relief_location_map.png \"Location of Lake Oroville in California, USA.\") Location of Lake Oroville in California, USA.",
"score": 0.6367981864470856
},
{
"url": "https://git.client.guacamolebox.net/butterfly/ragdocs/raw/branch/main/web_results/wiki_Lake_Oroville.html",
"title": "\r\n Lake Oroville - Wikipedia\r\n ",
"content": "| Lake Oroville | |\n| --- | --- |\n| Satellite image of the lake by [Sentinel-2](/wiki/Sentinel-2 \"Sentinel-2\") | |\n| [Location of Lake Oroville in California, USA.](/wiki/File:Relief_map_of_California.png \"Location of Lake Oroville in California, USA.\") Location of Lake Oroville in California, USA. Lake Oroville Show map of California [Location of Lake Oroville in California, USA.](/wiki/File:Usa_edcp_relief_location_map.png \"Location of Lake Oroville in California, USA.\") Location of Lake Oroville in California, USA.",
"score": 0.6367981864470856
},
{
"url": "https://git.client.guacamolebox.net/butterfly/ragdocs/raw/branch/main/web_results/wiki_Lake_Oroville.html",
"title": "\r\n Lake Oroville - Wikipedia\r\n ",
"content": "| |. **Lake Oroville**\n[[\n1\n]](#cite_note-GNIS-1)\nis a\n[reservoir](/wiki/Reservoir \"Reservoir\")\nformed by the\n[Oroville Dam](/wiki/Oroville_Dam \"Oroville Dam\")\nimpounding the\n[Feather River](/wiki/Feather_River \"Feather River\")\n, located in\n[Butte County](/wiki/Butte_County,_California \"Butte County, California\")\n, northern\n[California](/wiki/California \"California\")\n. The lake is situated 5 miles (8\u00a0km) northeast of the city of\n[Oroville](/wiki/Oroville,_California \"Oroville, California\")\n, within the\n[Lake Oroville State Recreation Area](/wiki/Lake_Oroville_State_Recreation_Area \"Lake Oroville State Recreation Area\")\n, in the western foothills of the\n[Sierra Nevada](/wiki/Sierra_Nevada \"Sierra Nevada\")\n. Known as the\n[second-largest reservoir in California](/wiki/List_of_largest_reservoirs_of_California \"List of largest reservoirs of California\")\n, Lake Oroville is treated as a keystone facility within the\n[California State Water Project](/wiki/California_State_Water_Project \"California State Water Project\")\nby storing water, providing\n[flood control](/wiki/Flood_control \"Flood control\")\n, recreation, freshwater releases to assist in controlling the salinity intrusion into the Sacramento-San Joaquin Delta and protecting fish and wildlife.",
"score": 0.5920514170838611
},
{
"url": "https://git.client.guacamolebox.net/butterfly/ragdocs/raw/branch/main/web_results/wiki_Lake_Oroville.html",
"title": "\r\n Lake Oroville - Wikipedia\r\n ",
"content": "| |. **Lake Oroville**\n[[\n1\n]](#cite_note-GNIS-1)\nis a\n[reservoir](/wiki/Reservoir \"Reservoir\")\nformed by the\n[Oroville Dam](/wiki/Oroville_Dam \"Oroville Dam\")\nimpounding the\n[Feather River](/wiki/Feather_River \"Feather River\")\n, located in\n[Butte County](/wiki/Butte_County,_California \"Butte County, California\")\n, northern\n[California](/wiki/California \"California\")\n. The lake is situated 5 miles (8\u00a0km) northeast of the city of\n[Oroville](/wiki/Oroville,_California \"Oroville, California\")\n, within the\n[Lake Oroville State Recreation Area](/wiki/Lake_Oroville_State_Recreation_Area \"Lake Oroville State Recreation Area\")\n, in the western foothills of the\n[Sierra Nevada](/wiki/Sierra_Nevada \"Sierra Nevada\")\n. Known as the\n[second-largest reservoir in California](/wiki/List_of_largest_reservoirs_of_California \"List of largest reservoirs of California\")\n, Lake Oroville is treated as a keystone facility within the\n[California State Water Project](/wiki/California_State_Water_Project \"California State Water Project\")\nby storing water, providing\n[flood control](/wiki/Flood_control \"Flood control\")\n, recreation, freshwater releases to assist in controlling the salinity intrusion into the Sacramento-San Joaquin Delta and protecting fish and wildlife.",
"score": 0.5920514170838611
},
{
"url": "https://git.client.guacamolebox.net/butterfly/ragdocs/raw/branch/main/web_results/wiki_Lake_Oroville.html",
"title": "\r\n Lake Oroville - Wikipedia\r\n ",
"content": "Lake Oroville - Wikipedia\n\n[Jump to content](#bodyContent)\n\n[Coordinates](/wiki/Geographic_coordinate_system \"Geographic coordinate system\")\n:\n[39\u00b032\u203214\u2033N\n\n121\u00b029\u203200\u2033W\n\n\ufeff / \ufeff\n\n\n39.53722\u00b0N 121.48333\u00b0W\n\n\ufeff /\n39.53722; -121.48333](https://geohack.toolforge.org/geohack.php?pagename=Lake_Oroville&params=39_32_14_N_121_29_00_W_scale:100000_type:waterbody_region:US-CA)\n\nFrom Wikipedia, the free encyclopedia\n\nReservoir in Butte County, California, U.S.",
"score": 0.589832461931772
}
]

File diff suppressed because one or more lines are too long

View File

@ -1,71 +0,0 @@
import requests
import logging
import math
logger = logging.getLogger(__name__)
class VectorService:
def __init__(self):
# Hardcoded for portability in standalone binary
self.api_key = "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"
self.model = "nvidia/llama-nemotron-embed-vl-1b-v2:free"
self.url = "https://openrouter.ai/api/v1/embeddings"
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def get_embedding(self, text=None, image_b64=None, mime_type="image/jpeg"):
"""Fetch multimodal embedding from OpenRouter API."""
if not text and not image_b64: return None
# Structure payload for multimodal embedding
content = []
if text:
content.append({"type": "text", "text": text[:10000]}) # Truncate large text
if image_b64:
content.append({
"type": "image_url",
"image_url": {"url": f"data:{mime_type};base64,{image_b64}"}
})
payload = {
"model": self.model,
"input": [{"content": content}]
}
try:
resp = requests.post(self.url, headers=self.headers, json=payload, timeout=45)
if resp.status_code == 200:
data = resp.json()
if "data" in data and len(data["data"]) > 0:
return data["data"][0]["embedding"]
else:
logger.error(f"OpenRouter Error {resp.status_code}: {resp.text}")
except Exception as e:
logger.error(f"Failed to get embedding: {e}")
return None
@staticmethod
def cosine_similarity(vec1, vec2):
"""Pure Python implementation of Cosine Similarity."""
if not vec1 or not vec2 or len(vec1) != len(vec2):
return 0.0
dot_product = sum(a * b for a, b in zip(vec1, vec2))
magnitude1 = math.sqrt(sum(a * a for a in vec1))
magnitude2 = math.sqrt(sum(b * b for b in vec2))
if magnitude1 == 0 or magnitude2 == 0:
return 0.0
return dot_product / (magnitude1 * magnitude2)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
vs = VectorService()
emb = vs.get_embedding("Hello world")
if emb:
print(f"Embedding length: {len(emb)}")
sim = vs.cosine_similarity(emb, emb)
print(f"Self-similarity: {sim}")