moxieTalking/brain.py
Echo Assistant d6b64d04d1 feat: initial Echo voice assistant — Vosk + OpenRouter + Qwen3-TTS
- stt.py: WakeWordListener (openWakeWord) + Transcriber (Vosk)
- brain.py: Async OpenRouter streaming client with command parsing
- tts.py: Qwen3-TTS engine with voice selection & instruction control
- actions.py: 10 local OS commands (open_app, set_timer, search, etc.)
- main.py: Async orchestrator with Phase 5 parallel TTS streaming
2026-03-31 00:09:00 +00:00

160 lines
5.7 KiB
Python

"""
brain.py — OpenRouter LLM Client (Streaming)
Responsibilities:
1. Send transcribed text to OpenRouter with a system prompt that
instructs the model to produce:
a) A concise verbal response (≤ 2 sentences for voice).
b) An optional JSON command block for local execution.
2. Stream tokens back so the TTS engine can start early (Phase 5).
3. Parse any JSON command block and return it alongside the spoken text.
Environment Variables:
OPENROUTER_API_KEY — your OpenRouter API key
OPENROUTER_MODEL — model identifier (default: qwen/qwen-3-235b-a22b)
OPENROUTER_BASE_URL — (optional) custom base URL
Dependencies:
pip install openai
"""
import json
import logging
import os
from typing import AsyncIterator
from openai import AsyncOpenAI
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
DEFAULT_MODEL = "qwen/qwen-3-235b-a22b"
SYSTEM_PROMPT = """\
You are Echo, a concise, helpful voice assistant. Follow these rules strictly:
1. **Verbal response**: Reply in ≤ 2 short sentences so it sounds natural
when spoken aloud. Be direct and conversational.
2. **Local commands** (optional): If the user's request can be fulfilled by a
local OS action (e.g. opening an app, setting a timer, checking the time,
creating a reminder), include a single JSON block at the very end of your
response using this exact format:
```json
{"action": "<command_name>", "params": {"key": "value"}}
```
Supported actions: open_app, set_timer, get_time, get_date, get_weather,
create_reminder, control_volume, shutdown, search_web, calculate.
3. Do NOT include the JSON block in your spoken text. The spoken text is
everything BEFORE the JSON block.
4. If no local action is needed, just respond normally without any JSON.
5. Never use markdown formatting, bullet points, or headers in the spoken text.
"""
class Brain:
"""Async client for OpenRouter LLM with streaming support."""
def __init__(
self,
api_key: str | None = None,
model: str = DEFAULT_MODEL,
base_url: str = "https://openrouter.ai/api/v1",
):
self.model = model
self.client = AsyncOpenAI(
api_key=api_key or os.environ.get("OPENROUTER_API_KEY", ""),
base_url=base_url,
)
# Conversation history (rolling window)
self._history: list[dict[str, str]] = []
self._max_history = 20 # keep last 20 messages for context
async def think(
self,
user_text: str,
) -> AsyncIterator[dict]:
"""
Stream the LLM response token-by-token.
Yields:
dict with keys:
- "type": "token" | "command" | "done"
- "text": the token string (for "token" type)
- "command": parsed JSON dict (for "command" type)
"""
self._history.append({"role": "user", "content": user_text})
if len(self._history) > self._max_history:
self._history = self._history[-self._max_history:]
messages = [{"role": "system", "content": SYSTEM_PROMPT}] + self._history
logger.info("Sending to OpenRouter (%s): %s", self.model, user_text[:80])
full_response = ""
try:
stream = await self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=True,
temperature=0.7,
max_tokens=300,
)
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
full_response += delta.content
yield {"type": "token", "text": delta.content}
except Exception:
logger.exception("OpenRouter request failed")
yield {"type": "token", "text": "Sorry, I had trouble thinking about that."}
# Parse any JSON command block from the full response
command = self._extract_command(full_response)
if command:
logger.info("Parsed command: %s", command)
yield {"type": "command", "command": command}
# Clean spoken text (strip JSON block and thinking tags)
spoken_text = self._clean_spoken_text(full_response)
self._history.append({"role": "assistant", "content": full_response})
yield {"type": "done", "text": spoken_text}
def _extract_command(self, text: str) -> dict | None:
"""Extract the JSON command block from the LLM response."""
try:
# Find JSON code block
if "```json" in text:
start = text.index("```json") + 7
end = text.index("```", start)
json_str = text[start:end].strip()
return json.loads(json_str)
# Try bare JSON at the end
for i in range(len(text) - 1, -1, -1):
if text[i] == "{":
candidate = text[i:].strip()
return json.loads(candidate)
except (json.JSONDecodeError, ValueError):
pass
return None
@staticmethod
def _clean_spoken_text(text: str) -> str:
"""Remove JSON blocks and Qwen thinking tags from spoken text."""
import re
# Remove Qwen3 <think/> blocks
cleaned = re.sub(r"<think[^>]*>.*?</think\s*>", "", text, flags=re.DOTALL)
# Remove JSON code blocks
cleaned = re.sub(r"```json.*?```", "", cleaned, flags=re.DOTALL)
# Remove any trailing bare JSON object
cleaned = re.sub(r"\{[^}]*\"action\"[^}]*\}", "", cleaned)
# Clean up whitespace
cleaned = cleaned.strip().strip(".")
return cleaned