"""
server.py — Echo Voice Assistant Web Server (FastAPI + WebSocket)
Starts a web server on port 8001 that serves:
- Web UI (static files from web/)
- WebSocket endpoint for streaming chat
- REST API for health, audio, and settings
Usage:
python server.py
# Then open http://localhost:8001 in your browser
Environment Variables (see .env.example):
OPENROUTER_API_KEY — required for LLM responses
SERVER_PORT — port to run on (default: 8001)
SERVER_HOST — host to bind to (default: 0.0.0.0)
Dependencies:
pip install fastapi uvicorn python-multipart websockets
"""
import asyncio
import json
import logging
import os
import uuid
from pathlib import Path
from dotenv import load_dotenv
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from brain import Brain
from tts import TTSEngine
from actions import execute as execute_action
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s │ %(name)-18s │ %(levelname)-7s │ %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("echo.server")
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
load_dotenv(Path(__file__).parent / ".env")
SERVER_HOST = os.environ.get("SERVER_HOST", "0.0.0.0")
SERVER_PORT = int(os.environ.get("SERVER_PORT", "8001"))
WEB_DIR = Path(__file__).parent / "web"
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(title="Echo Voice Assistant", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# Initialize engines
# ---------------------------------------------------------------------------
brain = Brain(
api_key=os.environ.get("OPENROUTER_API_KEY"),
model=os.environ.get("OPENROUTER_MODEL", "qwen/qwen-3-235b-a22b"),
)
tts = TTSEngine(
model_name=os.environ.get("QWEN_TTS_MODEL", "Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice"),
voice_sample=os.environ.get("QWEN_TTS_VOICE", "voices/echo_voice.wav"),
instruction=os.environ.get(
"QWEN_TTS_INSTRUCT",
"Speak clearly with a warm, friendly tone. Be natural and conversational.",
),
)
# ---------------------------------------------------------------------------
# Per-session state
# ---------------------------------------------------------------------------
sessions: dict[str, dict] = {}
def get_session(ws_id: str) -> dict:
if ws_id not in sessions:
sessions[ws_id] = {"id": ws_id, "history": []}
return sessions[ws_id]
# ---------------------------------------------------------------------------
# Routes — Web UI
# ---------------------------------------------------------------------------
@app.get("/")
async def serve_index():
index = WEB_DIR / "index.html"
if index.exists():
return FileResponse(index)
return HTMLResponse("
Echo Web UI not found — place files in web/ directory
")
# Serve any static files from web/
@app.get("/{path:path}")
async def serve_static(path: str):
file_path = WEB_DIR / path
if file_path.exists() and file_path.is_file():
return FileResponse(file_path)
return HTMLResponse("Not found", status_code=404)
# ---------------------------------------------------------------------------
# Routes — API
# ---------------------------------------------------------------------------
@app.get("/api/health")
async def health():
voice_ok = Path(os.environ.get("QWEN_TTS_VOICE", "voices/echo_voice.wav")).exists()
return {
"status": "ok",
"voice_sample": "loaded" if voice_ok else "missing",
"model": os.environ.get("OPENROUTER_MODEL", "qwen/qwen-3-235b-a22b"),
}
@app.get("/api/audio/{filename}")
async def get_audio(filename: str):
"""Serve a generated audio file."""
audio_path = Path("audio_output") / filename
if audio_path.exists():
return FileResponse(audio_path, media_type="audio/wav")
return HTMLResponse("Not found", status_code=404)
# ---------------------------------------------------------------------------
# WebSocket — Streaming Chat
# ---------------------------------------------------------------------------
@app.websocket("/ws/chat")
async def websocket_chat(ws: WebSocket):
await ws.accept()
ws_id = str(uuid.uuid4())[:8]
session = get_session(ws_id)
logger.info("Client connected: session=%s", ws_id)
try:
while True:
data = json.loads(await ws.receive_text())
msg_type = data.get("type", "chat")
payload = data.get("payload", {})
if msg_type == "chat":
message = payload.get("message", "").strip()
if not message:
await ws.send_json({"type": "error", "text": "Empty message"})
continue
session["history"].append({"role": "user", "content": message})
# Stream tokens from the brain
full_text = ""
audio_url = None
pending_command = None
try:
async for event in brain.think(message):
if event["type"] == "token":
full_text += event["text"]
await ws.send_json({
"type": "token",
"text": event["text"],
})
elif event["type"] == "command":
pending_command = event["command"]
elif event["type"] == "done":
spoken = event["text"]
# Send completion with final text
await ws.send_json({
"type": "done",
"text": spoken,
"full_text": full_text,
})
# Store in session history
session["history"].append({
"role": "assistant",
"content": spoken,
})
# Keep last 20 messages
if len(session["history"]) > 20:
session["history"] = session["history"][-20:]
# Execute any local command
if pending_command:
action_name = pending_command.get("action", "")
action_params = pending_command.get("params", {})
logger.info(
"Executing action: %s %s", action_name, action_params
)
action_result = execute_action(action_name, action_params)
await ws.send_json({
"type": "action",
"action": action_name,
"result": action_result,
})
# Generate TTS audio (async, non-blocking)
try:
wav_path = await tts.generate(spoken)
if wav_path:
audio_url = f"/api/audio/{wav_path.name}"
await ws.send_json({
"type": "audio",
"url": audio_url,
})
except Exception as exc:
logger.warning("TTS generation skipped: %s", exc)
await ws.send_json({"type": "ready"})
except Exception as exc:
logger.exception("Error processing chat")
await ws.send_json({
"type": "error",
"text": f"Error: {exc}",
})
await ws.send_json({"type": "ready"})
elif msg_type == "clear":
session["history"] = []
await ws.send_json({"type": "cleared"})
except WebSocketDisconnect:
logger.info("Client disconnected: session=%s", ws_id)
except Exception:
logger.exception("WebSocket error for session=%s", ws_id)
finally:
if ws_id in sessions:
del sessions[ws_id]
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import uvicorn
logger.info("=" * 50)
logger.info(" ECHO VOICE ASSISTANT — Web Server")
logger.info(" http://%s:%d", SERVER_HOST, SERVER_PORT)
logger.info("=" * 50)
uvicorn.run(
app,
host=SERVER_HOST,
port=SERVER_PORT,
log_level="warning",
)