""" Document Processor - Handles document parsing and chunking Supports multiple document formats: - Plain text (.txt, .md) - PDF (.pdf) - HTML (.html, .htm) - Word documents (.docx) - Code files (.py, .js, etc.) """ from __future__ import annotations import hashlib import logging import os import re import uuid from pathlib import Path from typing import Any, Optional log = logging.getLogger(__name__) class DocumentProcessor: """ Process documents into chunks suitable for vector storage. Handles: - Multiple file formats - Intelligent chunking with overlap - Metadata extraction """ def __init__( self, chunk_size: int = 1000, chunk_overlap: int = 200, ): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap async def process( self, content: bytes, filename: str, metadata: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: """ Process a document into chunks. Args: content: Raw document content filename: Original filename metadata: Optional additional metadata Returns: Dictionary with chunks, metadatas, and ids """ # Extract text based on file type text = await self._extract_text(content, filename) if not text.strip(): return {"chunks": [], "metadatas": [], "ids": [], "document_id": None} # Generate document ID document_id = str(uuid.uuid4()) # Create chunks chunks = self._chunk_text(text) # Create metadata for each chunk base_metadata = { "source": filename, "document_id": document_id, **(metadata or {}), } metadatas = [] ids = [] for i, chunk in enumerate(chunks): chunk_id = hashlib.md5(f"{document_id}_{i}".encode()).hexdigest() ids.append(chunk_id) metadatas.append({ **base_metadata, "chunk_index": i, "chunk_length": len(chunk), }) return { "chunks": chunks, "metadatas": metadatas, "ids": ids, "document_id": document_id, "total_chars": len(text), } async def _extract_text(self, content: bytes, filename: str) -> str: """Extract text from document based on file type.""" ext = Path(filename).suffix.lower() try: if ext in (".txt", ".md", ".rst", ".log"): return content.decode("utf-8", errors="ignore") elif ext == ".pdf": return await self._extract_pdf(content) elif ext in (".html", ".htm"): return await self._extract_html(content) elif ext == ".docx": return await self._extract_docx(content) elif ext in (".py", ".js", ".ts", ".java", ".cpp", ".c", ".go", ".rs", ".rb", ".php", ".cs", ".swift", ".kt"): return content.decode("utf-8", errors="ignore") elif ext in (".json", ".yaml", ".yml", ".xml", ".toml"): return content.decode("utf-8", errors="ignore") elif ext in (".csv", ".tsv"): return content.decode("utf-8", errors="ignore") else: # Try to decode as text try: return content.decode("utf-8", errors="ignore") except Exception: log.warning(f"Unknown file type: {ext}, treating as binary") return "" except Exception as e: log.error(f"Failed to extract text from {filename}: {e}") return "" async def _extract_pdf(self, content: bytes) -> str: """Extract text from PDF.""" try: import fitz # PyMuPDF doc = fitz.open(stream=content, filetype="pdf") text_parts = [] for page in doc: text_parts.append(page.get_text()) doc.close() return "\n\n".join(text_parts) except ImportError: log.warning("PyMuPDF not installed, PDF extraction unavailable") return "" except Exception as e: log.error(f"PDF extraction failed: {e}") return "" async def _extract_html(self, content: bytes) -> str: """Extract text from HTML.""" try: from bs4 import BeautifulSoup soup = BeautifulSoup(content, "html.parser") # Remove script and style elements for element in soup(["script", "style", "nav", "footer", "header"]): element.decompose() return soup.get_text(separator="\n", strip=True) except ImportError: log.warning("BeautifulSoup not installed, HTML extraction unavailable") return content.decode("utf-8", errors="ignore") except Exception as e: log.error(f"HTML extraction failed: {e}") return "" async def extract_text_from_html(self, content: bytes) -> str: """ Public method to extract text from HTML content. Args: content: Raw HTML content Returns: Extracted text content """ return await self._extract_html(content) async def _extract_docx(self, content: bytes) -> str: """Extract text from DOCX.""" try: import io from docx import Document doc = Document(io.BytesIO(content)) return "\n\n".join(para.text for para in doc.paragraphs) except ImportError: log.warning("python-docx not installed, DOCX extraction unavailable") return "" except Exception as e: log.error(f"DOCX extraction failed: {e}") return "" def _chunk_text(self, text: str) -> list[str]: """ Split text into overlapping chunks. Uses a sentence-aware chunking strategy to avoid breaking mid-sentence. """ if len(text) <= self.chunk_size: return [text.strip()] if text.strip() else [] # Split into sentences sentences = self._split_sentences(text) chunks = [] current_chunk = [] current_length = 0 for sentence in sentences: sentence_length = len(sentence) # If adding this sentence would exceed chunk size if current_length + sentence_length > self.chunk_size and current_chunk: # Save current chunk chunks.append(" ".join(current_chunk)) # Start new chunk with overlap overlap_text = self._get_overlap_text(current_chunk) current_chunk = [overlap_text, sentence] if overlap_text else [sentence] current_length = len(" ".join(current_chunk)) else: current_chunk.append(sentence) current_length += sentence_length + 1 # +1 for space # Add final chunk if current_chunk: chunks.append(" ".join(current_chunk)) return [c.strip() for c in chunks if c.strip()] def _split_sentences(self, text: str) -> list[str]: """Split text into sentences.""" # Simple sentence splitting - can be improved with NLP libraries sentence_endings = r'(?<=[.!?])\s+' sentences = re.split(sentence_endings, text) return [s.strip() for s in sentences if s.strip()] def _get_overlap_text(self, chunk_parts: list[str]) -> str: """Get text for overlap from the end of the current chunk.""" if not chunk_parts: return "" full_text = " ".join(chunk_parts) if len(full_text) <= self.chunk_overlap: return full_text # Get last N characters overlap = full_text[-self.chunk_overlap:] # Try to start at a word boundary space_idx = overlap.find(" ") if space_idx > 0: overlap = overlap[space_idx + 1:] return overlap