test/moxie/web/routes.py

841 lines
25 KiB
Python

"""
Web UI Routes
Landing page, chat interface, and user profile.
"""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, List
from fastapi import APIRouter, Request, Response, Cookie, Form, UploadFile, File, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from jinja2 import Environment, FileSystemLoader
from pydantic import BaseModel
from loguru import logger
from config import settings
from auth.models import get_auth_manager, User
from core.orchestrator import Orchestrator
router = APIRouter()
# Templates - use Jinja2 directly without Starlette wrapper
TEMPLATES_DIR = Path(__file__).parent / "templates"
jinja_env = Environment(
loader=FileSystemLoader(str(TEMPLATES_DIR)),
autoescape=True,
)
def render_template(name: str, context: dict) -> str:
"""Render a template with the given context."""
template = jinja_env.get_template(name)
return template.render(**context)
def get_template_settings():
"""Get settings as a simple dict for template compatibility."""
return {
"admin_path": settings.admin_path,
"host": settings.host,
"port": settings.port,
}
# ============================================================================
# Helper Functions
# ============================================================================
async def get_current_user(
request: Request,
session_token: Optional[str] = Cookie(None)
) -> Optional[User]:
"""Get the current logged-in user from session cookie."""
if not session_token:
# Try header as fallback
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
session_token = auth_header[7:]
if not session_token:
return None
auth = get_auth_manager()
return auth.validate_session(session_token)
async def require_user(
request: Request,
session_token: Optional[str] = Cookie(None)
) -> User:
"""Require a logged-in user, redirect to login if not."""
user = await get_current_user(request, session_token)
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
return user
# ============================================================================
# Page Routes
# ============================================================================
@router.get("/", response_class=HTMLResponse)
async def landing_page(request: Request):
"""Landing page - redirects to chat if logged in, otherwise shows login."""
session_token = request.cookies.get("session_token")
if session_token:
auth = get_auth_manager()
user = auth.validate_session(session_token)
if user:
return HTMLResponse(
status_code=302,
headers={"Location": "/chat"}
)
html = render_template("landing.html", {
"settings": get_template_settings()
})
return HTMLResponse(content=html)
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request, error: Optional[str] = None):
"""Login page."""
html = render_template("login.html", {
"settings": get_template_settings(),
"error": error
})
return HTMLResponse(content=html)
@router.get("/signup", response_class=HTMLResponse)
async def signup_page(request: Request, error: Optional[str] = None):
"""Signup page."""
html = render_template("signup.html", {
"settings": get_template_settings(),
"error": error
})
return HTMLResponse(content=html)
@router.post("/login")
async def login_submit(
request: Request,
username: str = Form(...),
password: str = Form(...)
):
"""Process login form."""
auth = get_auth_manager()
user = auth.authenticate(username, password)
if not user:
html = render_template("login.html", {
"settings": get_template_settings(),
"error": "Invalid username or password"
})
return HTMLResponse(content=html, status_code=401)
# Create session
token = auth.create_session(user.id)
# Set cookie and redirect
response = JSONResponse({"success": True, "redirect": "/chat"})
response.set_cookie(
key="session_token",
value=token,
httponly=True,
max_age=7 * 24 * 60 * 60, # 7 days
samesite="lax"
)
return response
@router.post("/signup")
async def signup_submit(
request: Request,
username: str = Form(...),
email: str = Form(...),
password: str = Form(...),
confirm_password: str = Form(...)
):
"""Process signup form."""
if password != confirm_password:
html = render_template("signup.html", {
"settings": get_template_settings(),
"error": "Passwords do not match"
})
return HTMLResponse(content=html, status_code=400)
if len(password) < 6:
html = render_template("signup.html", {
"settings": get_template_settings(),
"error": "Password must be at least 6 characters"
})
return HTMLResponse(content=html, status_code=400)
auth = get_auth_manager()
user = auth.create_user(username, email, password)
if not user:
html = render_template("signup.html", {
"settings": get_template_settings(),
"error": "Username or email already exists"
})
return HTMLResponse(content=html, status_code=400)
# Create session
token = auth.create_session(user.id)
# Set cookie and redirect
response = JSONResponse({"success": True, "redirect": "/chat"})
response.set_cookie(
key="session_token",
value=token,
httponly=True,
max_age=7 * 24 * 60 * 60,
samesite="lax"
)
return response
@router.get("/logout")
async def logout(response: Response, session_token: Optional[str] = Cookie(None)):
"""Logout user."""
if session_token:
auth = get_auth_manager()
auth.delete_session(session_token)
response = JSONResponse({"success": True})
response.delete_cookie("session_token")
return response
@router.get("/chat", response_class=HTMLResponse)
async def chat_page(request: Request, session_token: Optional[str] = Cookie(None)):
"""Main chat interface."""
auth = get_auth_manager()
user = None
if session_token:
user = auth.validate_session(session_token)
if not user:
return HTMLResponse(
status_code=302,
headers={"Location": "/login"}
)
rate_info = auth.check_rate_limit(user.id)
html = render_template("chat.html", {
"settings": get_template_settings(),
"user": user,
"rate_info": rate_info
})
return HTMLResponse(content=html)
@router.get("/profile", response_class=HTMLResponse)
async def profile_page(request: Request, session_token: Optional[str] = Cookie(None)):
"""User profile page with document management."""
auth = get_auth_manager()
user = None
if session_token:
user = auth.validate_session(session_token)
if not user:
return HTMLResponse(
status_code=302,
headers={"Location": "/login"}
)
documents = auth.get_user_documents(user.id)
rate_info = auth.check_rate_limit(user.id)
html = render_template("profile.html", {
"settings": get_template_settings(),
"user": user,
"documents": documents,
"rate_info": rate_info
})
return HTMLResponse(content=html)
# ============================================================================
# API Routes (for chat functionality)
# ============================================================================
class ChatRequest(BaseModel):
"""Chat request model."""
message: str
conversation_id: Optional[str] = None
attachments: Optional[List[str]] = None
class ConversationUpdate(BaseModel):
"""Conversation update model."""
title: Optional[str] = None
@router.get("/api/user")
async def get_user_info(request: Request, session_token: Optional[str] = Cookie(None)):
"""Get current user info."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
rate_info = auth.check_rate_limit(user.id)
return {
"id": user.id,
"username": user.username,
"email": user.email,
"is_admin": user.is_admin,
"request_count": user.request_count,
"request_limit": user.request_limit,
"remaining_requests": rate_info["remaining"]
}
@router.get("/api/conversations")
async def get_conversations(request: Request, session_token: Optional[str] = Cookie(None)):
"""Get all conversations for the current user."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
# Get conversations from database
from config import get_db_path
import sqlite3
conn = sqlite3.connect(str(get_db_path()))
cursor = conn.cursor()
cursor.execute("""
SELECT id, title, created_at, updated_at
FROM conversations WHERE user_id = ?
ORDER BY updated_at DESC
""", (user.id,))
conversations = []
for row in cursor.fetchall():
conversations.append({
"id": row[0],
"title": row[1],
"created_at": row[2],
"updated_at": row[3]
})
conn.close()
return conversations
@router.post("/api/conversations")
async def create_conversation(
request: Request,
session_token: Optional[str] = Cookie(None)
):
"""Create a new conversation."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
from config import get_db_path
import sqlite3
conv_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat()
conn = sqlite3.connect(str(get_db_path()))
cursor = conn.cursor()
# Ensure conversations table exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
title TEXT,
created_at TEXT,
updated_at TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
)
""")
cursor.execute("""
INSERT INTO conversations (id, user_id, title, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
""", (conv_id, user.id, "New Chat", now, now))
conn.commit()
conn.close()
return {"id": conv_id, "title": "New Chat", "created_at": now, "updated_at": now}
@router.get("/api/conversations/{conv_id}/messages")
async def get_conversation_messages(
conv_id: str,
request: Request,
session_token: Optional[str] = Cookie(None)
):
"""Get all messages for a conversation."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
from config import get_db_path
import sqlite3
conn = sqlite3.connect(str(get_db_path()))
cursor = conn.cursor()
# Verify ownership
cursor.execute("SELECT user_id FROM conversations WHERE id = ?", (conv_id,))
row = cursor.fetchone()
if not row or row[0] != user.id:
conn.close()
return JSONResponse({"error": "Not found"}, status_code=404)
# Get messages
cursor.execute("""
SELECT id, role, content, attachments, created_at
FROM messages WHERE conversation_id = ?
ORDER BY created_at ASC
""", (conv_id,))
messages = []
for row in cursor.fetchall():
attachments = json.loads(row[3]) if row[3] else []
messages.append({
"id": row[0],
"role": row[1],
"content": row[2],
"attachments": attachments,
"created_at": row[4]
})
conn.close()
return messages
@router.patch("/api/conversations/{conv_id}")
async def update_conversation(
conv_id: str,
update: ConversationUpdate,
request: Request,
session_token: Optional[str] = Cookie(None)
):
"""Update a conversation (e.g., rename)."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
from config import get_db_path
import sqlite3
conn = sqlite3.connect(str(get_db_path()))
cursor = conn.cursor()
# Verify ownership
cursor.execute("SELECT user_id FROM conversations WHERE id = ?", (conv_id,))
row = cursor.fetchone()
if not row or row[0] != user.id:
conn.close()
return JSONResponse({"error": "Not found"}, status_code=404)
now = datetime.utcnow().isoformat()
if update.title:
cursor.execute(
"UPDATE conversations SET title = ?, updated_at = ? WHERE id = ?",
(update.title, now, conv_id)
)
conn.commit()
conn.close()
return {"success": True}
@router.delete("/api/conversations/{conv_id}")
async def delete_conversation(
conv_id: str,
request: Request,
session_token: Optional[str] = Cookie(None)
):
"""Delete a conversation and its messages."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
from config import get_db_path
import sqlite3
conn = sqlite3.connect(str(get_db_path()))
cursor = conn.cursor()
# Verify ownership
cursor.execute("SELECT user_id FROM conversations WHERE id = ?", (conv_id,))
row = cursor.fetchone()
if not row or row[0] != user.id:
conn.close()
return JSONResponse({"error": "Not found"}, status_code=404)
# Delete messages first
cursor.execute("DELETE FROM messages WHERE conversation_id = ?", (conv_id,))
# Delete conversation
cursor.execute("DELETE FROM conversations WHERE id = ?", (conv_id,))
conn.commit()
conn.close()
return {"success": True}
@router.post("/api/chat")
async def send_chat_message(
chat_request: ChatRequest,
request: Request,
session_token: Optional[str] = Cookie(None)
):
"""Send a chat message and get a streaming response."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
# Check rate limit
rate_info = auth.check_rate_limit(user.id)
if not rate_info["allowed"]:
return JSONResponse(
{"error": f"Daily limit reached. You have used {user.request_count}/{user.request_limit} requests today."},
status_code=429
)
# Get orchestrator
orchestrator: Orchestrator = request.app.state.orchestrator
from config import get_db_path
import sqlite3
conn = sqlite3.connect(str(get_db_path()))
cursor = conn.cursor()
# Ensure tables exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
title TEXT,
created_at TEXT,
updated_at TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT,
attachments TEXT,
created_at TEXT,
FOREIGN KEY (conversation_id) REFERENCES conversations(id)
)
""")
# Create conversation if needed
conv_id = chat_request.conversation_id
if not conv_id:
conv_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat()
# Generate title from first message
title = chat_request.message[:50] + ("..." if len(chat_request.message) > 50 else "")
cursor.execute("""
INSERT INTO conversations (id, user_id, title, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
""", (conv_id, user.id, title, now, now))
# Save user message
user_msg_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat()
cursor.execute("""
INSERT INTO messages (id, conversation_id, role, content, attachments, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (user_msg_id, conv_id, "user", chat_request.message,
json.dumps(chat_request.attachments or []), now))
# Update conversation timestamp
cursor.execute("UPDATE conversations SET updated_at = ? WHERE id = ?", (now, conv_id))
# Get previous messages for context
cursor.execute("""
SELECT role, content FROM messages
WHERE conversation_id = ?
ORDER BY created_at ASC
""", (conv_id,))
messages = [{"role": row[0], "content": row[1]} for row in cursor.fetchall()]
conn.commit()
conn.close()
# Increment request count
auth.increment_request_count(user.id)
# Return streaming response
return StreamingResponse(
stream_chat_response(request, orchestrator, messages, conv_id, user),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"X-Conversation-Id": conv_id
}
)
async def stream_chat_response(
request: Request,
orchestrator: Orchestrator,
messages: List[dict],
conv_id: str,
user: User
):
"""Stream the chat response and save to database."""
from config import get_db_path
import sqlite3
full_response = ""
async for chunk in orchestrator.process_stream(messages=messages):
if "content" in chunk and chunk["content"]:
full_response += chunk["content"]
data = {"content": chunk["content"]}
yield f"data: {json.dumps(data)}\n\n"
# Save assistant message to database
conn = sqlite3.connect(str(get_db_path()))
cursor = conn.cursor()
asst_msg_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat()
cursor.execute("""
INSERT INTO messages (id, conversation_id, role, content, created_at)
VALUES (?, ?, ?, ?, ?)
""", (asst_msg_id, conv_id, "assistant", full_response, now))
conn.commit()
conn.close()
# Send done signal
yield "data: [DONE]\n\n"
# ============================================================================
# File Upload Routes
# ============================================================================
@router.post("/api/upload")
async def upload_file(
file: UploadFile = File(...),
session_token: Optional[str] = Cookie(None)
):
"""Upload a file for attachment to messages."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
# Read file
content = await file.read()
# Save to temp location or database
from config import get_data_dir
uploads_dir = get_data_dir() / "uploads" / user.id
uploads_dir.mkdir(parents=True, exist_ok=True)
file_id = str(uuid.uuid4())
file_ext = Path(file.filename or "file").suffix
file_path = uploads_dir / f"{file_id}{file_ext}"
with open(file_path, "wb") as f:
f.write(content)
return {
"id": file_id,
"filename": file.filename,
"size": len(content),
"url": f"/api/files/{file_id}"
}
@router.get("/api/files/{file_id}")
async def get_file(
file_id: str,
session_token: Optional[str] = Cookie(None)
):
"""Get an uploaded file."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
from config import get_data_dir
uploads_dir = get_data_dir() / "uploads" / user.id
# Find file by ID
for file_path in uploads_dir.glob(f"{file_id}.*"):
from fastapi.responses import FileResponse
return FileResponse(file_path)
return JSONResponse({"error": "File not found"}, status_code=404)
# ============================================================================
# User Profile Document Routes
# ============================================================================
@router.post("/api/profile/documents")
async def upload_profile_document(
file: UploadFile = File(...),
session_token: Optional[str] = Cookie(None)
):
"""Upload a document to user's profile."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
content = await file.read()
file_type = Path(file.filename or "file").suffix
doc_id = auth.add_user_document(user.id, file.filename or "document", file_type, content)
return {
"id": doc_id,
"filename": file.filename,
"size": len(content),
"file_type": file_type
}
@router.get("/api/profile/documents")
async def get_profile_documents(
session_token: Optional[str] = Cookie(None)
):
"""Get all documents in user's profile."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
documents = auth.get_user_documents(user.id)
return documents
@router.delete("/api/profile/documents/{doc_id}")
async def delete_profile_document(
doc_id: str,
session_token: Optional[str] = Cookie(None)
):
"""Delete a document from user's profile."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
success = auth.delete_user_document(user.id, doc_id)
if success:
return {"success": True}
return JSONResponse({"error": "Document not found"}, status_code=404)
# ============================================================================
# Password Change
# ============================================================================
class PasswordChange(BaseModel):
current_password: str
new_password: str
@router.post("/api/profile/password")
async def change_password(
data: PasswordChange,
session_token: Optional[str] = Cookie(None)
):
"""Change user's password."""
auth = get_auth_manager()
if not session_token:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user = auth.validate_session(session_token)
if not user:
return JSONResponse({"error": "Invalid session"}, status_code=401)
# Verify current password
verified = auth.authenticate(user.username, data.current_password)
if not verified:
return JSONResponse({"error": "Current password is incorrect"}, status_code=400)
# Change password
success = auth.change_password(user.id, data.new_password)
if success:
return {"success": True}
return JSONResponse({"error": "Failed to change password"}, status_code=500)