feat: initial Echo voice assistant — Vosk + OpenRouter + Qwen3-TTS

- stt.py: WakeWordListener (openWakeWord) + Transcriber (Vosk)
- brain.py: Async OpenRouter streaming client with command parsing
- tts.py: Qwen3-TTS engine with voice selection & instruction control
- actions.py: 10 local OS commands (open_app, set_timer, search, etc.)
- main.py: Async orchestrator with Phase 5 parallel TTS streaming
This commit is contained in:
Echo Assistant 2026-03-31 00:08:52 +00:00
parent 722596bb09
commit d6b64d04d1
8 changed files with 1187 additions and 0 deletions

24
.env.example Normal file
View File

@ -0,0 +1,24 @@
# ===========================================================
# Echo Voice Assistant — Environment Configuration
# ===========================================================
# Copy this file to .env and fill in your values:
# cp .env.example .env
# ===========================================================
# --- OpenRouter (required) ---
# Get your key at: https://openrouter.ai/keys
OPENROUTER_API_KEY=sk-or-v1-xxxxxxxxxxxxxxxxxxxxxxxx
OPENROUTER_MODEL=qwen/qwen-3-235b-a22b
# --- Vosk STT (optional overrides) ---
# Download models from: https://alphacephei.com/vosk/models
# Set to a local path relative to the project root
VOSK_MODEL_PATH=models/vosk-model-small-en-us
WAKE_WORD=echo
# --- Qwen3-TTS (optional overrides) ---
# Available preset voices: Ryan, Serena, Diana, etc.
# Or set a path to a 3-second .wav sample for voice cloning
QWEN_TTS_MODEL=Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice
QWEN_TTS_VOICE=Ryan
QWEN_TTS_INSTRUCT=Speak clearly with a warm, friendly tone. Be natural and conversational.

39
.gitignore vendored Normal file
View File

@ -0,0 +1,39 @@
# Echo Voice Assistant — Git Ignore
# Python
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
*.egg
# Virtual environments
venv/
.venv/
env/
# Model weights (large files — download separately)
models/
!models/.gitkeep
# Generated audio
audio_output/
!audio_output/.gitkeep
# Environment & secrets
.env
.env.local
# IDE
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Logs
*.log

275
actions.py Normal file
View File

@ -0,0 +1,275 @@
"""
actions.py Local OS Command Execution
Responsibilities:
1. Provide a registry of local actions the assistant can perform.
2. Map action names from the LLM's JSON commands to Python functions.
3. Execute commands and return a spoken summary for TTS feedback.
Supported actions:
open_app Launch a desktop application
set_timer Start a countdown timer with audible alarm
get_time Return the current time
get_date Return today's date
get_weather (stub) Return weather info
create_reminder (stub) Save a reminder note
control_volume Adjust system volume (Linux/macOS)
search_web Open a web search in the default browser
calculate Evaluate a math expression safely
shutdown System shutdown (with confirmation)
"""
import logging
import os
import platform
import subprocess
import threading
import webbrowser
from datetime import datetime
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Action Registry
# ---------------------------------------------------------------------------
_REGISTRY: dict[str, callable] = {}
def register(name: str):
"""Decorator to register an action function by name."""
def decorator(func):
_REGISTRY[name] = func
return func
return decorator
def execute(action: str, params: dict | None = None) -> str:
"""
Execute a registered action and return a spoken summary.
Args:
action: The action name (e.g., "open_app").
params: Optional dict of parameters.
Returns:
A short text description of what was done (for TTS feedback).
"""
params = params or {}
func = _REGISTRY.get(action)
if not func:
logger.warning("Unknown action: %s", action)
return f"Sorry, I don't know how to {action.replace('_', ' ')}."
try:
result = func(**params)
logger.info("Action '%s' executed: %s", action, result)
return result
except Exception as exc:
logger.exception("Action '%s' failed", action)
return f"Something went wrong: {exc}"
# ---------------------------------------------------------------------------
# Action Implementations
# ---------------------------------------------------------------------------
@register("get_time")
def get_time(**_) -> str:
now = datetime.now().strftime("%-I:%M %p")
return f"It's currently {now}."
@register("get_date")
def get_date(**_) -> str:
today = datetime.now().strftime("%A, %B %d, %Y")
return f"Today is {today}."
@register("open_app")
def open_app(app_name: str = "", **_) -> str:
if not app_name:
return "What app would you like me to open?"
system = platform.system()
app_lower = app_name.lower().strip()
try:
if system == "Darwin": # macOS
subprocess.Popen(["open", "-a", app_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
elif system == "Windows":
subprocess.Popen(f"start {app_name}", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else: # Linux
# Try common app launchers
app_map = {
"chrome": "google-chrome",
"firefox": "firefox",
"terminal": "gnome-terminal",
"files": "nautilus",
"calculator": "gnome-calculator",
"settings": "gnome-control-center",
"browser": "xdg-open",
"vs code": "code",
"vscode": "code",
}
cmd = app_map.get(app_lower, app_lower)
subprocess.Popen(
[cmd], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
return f"Opening {app_name}."
except FileNotFoundError:
return f"Sorry, I couldn't find {app_name} on this system."
@register("set_timer")
def set_timer(seconds: int = 60, label: str = "Timer", **_) -> str:
"""
Start a background timer that rings after the given number of seconds.
Uses the terminal bell when the timer completes.
"""
try:
duration = int(seconds)
except (ValueError, TypeError):
return "I need a number of seconds for the timer."
def _timer_thread():
import time
logger.info("Timer '%s' started for %d seconds", label, duration)
time.sleep(duration)
# Terminal bell
print(f"\a")
logger.info("Timer '%s' finished!", label)
# Try to use TTS to announce (if available — soft dependency)
try:
import pygame
pygame.mixer.init()
# Generate a simple beep
import numpy as np
sample_rate = 22050
t = np.linspace(0, 0.5, int(sample_rate * 0.5), dtype=np.float32)
tone = np.sin(2 * np.pi * 880 * t) * 0.5
tone = (tone * 32767).astype(np.int16)
# Save and play
import soundfile as sf
beep_path = f"/tmp/echo_timer_{os.urandom(4).hex()}.wav"
sf.write(beep_path, tone, sample_rate)
pygame.mixer.music.load(beep_path)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
pygame.time.wait(50)
pygame.mixer.quit()
os.unlink(beep_path)
except Exception:
pass # Fall back to terminal bell only
threading.Thread(target=_timer_thread, daemon=True, name=f"timer-{label}").start()
minutes, secs = divmod(duration, 60)
if minutes:
return f"{label} set for {minutes} minute{'s' if minutes != 1 else ''} and {secs} seconds."
return f"{label} set for {secs} seconds."
@register("get_weather")
def get_weather(location: str = "", **_) -> str:
"""Stub — can be expanded with a weather API integration."""
return (
"I don't have a weather service connected yet. "
"You can ask me again once the weather API is configured."
)
@register("create_reminder")
def create_reminder(text: str = "", **_) -> str:
"""Save a reminder to a local file."""
if not text:
return "What would you like me to remind you about?"
reminders_dir = Path.home() / ".echo" / "reminders"
reminders_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
reminder_file = reminders_dir / f"{timestamp}.txt"
reminder_file.write_text(f"[{datetime.now().isoformat()}] {text}\n")
return f"Reminder saved: {text}"
@register("control_volume")
def control_volume(level: int = 50, **_) -> str:
"""Adjust system volume (Linux/macOS only)."""
try:
vol = int(level)
vol = max(0, min(100, vol))
except (ValueError, TypeError):
return "Please specify a volume level between 0 and 100."
system = platform.system()
try:
if system == "Darwin":
subprocess.run(["osascript", "-e", f"set volume output volume {vol}"],
check=True, capture_output=True)
elif system == "Linux":
subprocess.run(
["pactl", "set-sink-volume", "@DEFAULT_SINK@", f"{vol}%"],
check=True, capture_output=True,
)
else:
return "Volume control isn't supported on Windows yet."
return f"Volume set to {vol}%."
except subprocess.CalledProcessError:
return "I couldn't adjust the volume."
@register("search_web")
def search_web(query: str = "", **_) -> str:
if not query:
return "What would you like to search for?"
url = f"https://www.google.com/search?q={query.replace(' ', '+')}"
webbrowser.open(url)
return f"Searching the web for: {query}"
@register("calculate")
def calculate(expression: str = "", **_) -> str:
if not expression:
return "What would you like me to calculate?"
# Whitelist only safe math operations
import ast
allowed = {
ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Pow,
ast.USub, ast.UAdd, ast.Constant, ast.Num,
}
try:
tree = ast.parse(expression.strip(), mode="eval")
for node in ast.walk(tree):
if type(node) not in allowed:
raise ValueError("Unsafe expression")
result = eval(compile(tree, "<calc>", "eval")) # noqa: S307
return f"The result is {result}"
except ZeroDivisionError:
return "You can't divide by zero."
except Exception:
return f"I couldn't calculate that expression."
@register("shutdown")
def shutdown(confirm: bool = False, **_) -> str:
if not confirm:
return "Are you sure? Please confirm the shutdown command."
system = platform.system()
try:
if system == "Darwin":
subprocess.run(["sudo", "shutdown", "-h", "now"], check=True)
elif system == "Windows":
subprocess.run(["shutdown", "/s", "/t", "5"], check=True)
else:
subprocess.run(["sudo", "shutdown", "-h", "now"], check=True)
return "Shutting down now. Goodbye!"
except Exception:
return "I don't have permission to shut down the system."

159
brain.py Normal file
View File

@ -0,0 +1,159 @@
"""
brain.py OpenRouter LLM Client (Streaming)
Responsibilities:
1. Send transcribed text to OpenRouter with a system prompt that
instructs the model to produce:
a) A concise verbal response ( 2 sentences for voice).
b) An optional JSON command block for local execution.
2. Stream tokens back so the TTS engine can start early (Phase 5).
3. Parse any JSON command block and return it alongside the spoken text.
Environment Variables:
OPENROUTER_API_KEY your OpenRouter API key
OPENROUTER_MODEL model identifier (default: qwen/qwen-3-235b-a22b)
OPENROUTER_BASE_URL (optional) custom base URL
Dependencies:
pip install openai
"""
import json
import logging
import os
from typing import AsyncIterator
from openai import AsyncOpenAI
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
DEFAULT_MODEL = "qwen/qwen-3-235b-a22b"
SYSTEM_PROMPT = """\
You are Echo, a concise, helpful voice assistant. Follow these rules strictly:
1. **Verbal response**: Reply in 2 short sentences so it sounds natural
when spoken aloud. Be direct and conversational.
2. **Local commands** (optional): If the user's request can be fulfilled by a
local OS action (e.g. opening an app, setting a timer, checking the time,
creating a reminder), include a single JSON block at the very end of your
response using this exact format:
```json
{"action": "<command_name>", "params": {"key": "value"}}
```
Supported actions: open_app, set_timer, get_time, get_date, get_weather,
create_reminder, control_volume, shutdown, search_web, calculate.
3. Do NOT include the JSON block in your spoken text. The spoken text is
everything BEFORE the JSON block.
4. If no local action is needed, just respond normally without any JSON.
5. Never use markdown formatting, bullet points, or headers in the spoken text.
"""
class Brain:
"""Async client for OpenRouter LLM with streaming support."""
def __init__(
self,
api_key: str | None = None,
model: str = DEFAULT_MODEL,
base_url: str = "https://openrouter.ai/api/v1",
):
self.model = model
self.client = AsyncOpenAI(
api_key=api_key or os.environ.get("OPENROUTER_API_KEY", ""),
base_url=base_url,
)
# Conversation history (rolling window)
self._history: list[dict[str, str]] = []
self._max_history = 20 # keep last 20 messages for context
async def think(
self,
user_text: str,
) -> AsyncIterator[dict]:
"""
Stream the LLM response token-by-token.
Yields:
dict with keys:
- "type": "token" | "command" | "done"
- "text": the token string (for "token" type)
- "command": parsed JSON dict (for "command" type)
"""
self._history.append({"role": "user", "content": user_text})
if len(self._history) > self._max_history:
self._history = self._history[-self._max_history:]
messages = [{"role": "system", "content": SYSTEM_PROMPT}] + self._history
logger.info("Sending to OpenRouter (%s): %s", self.model, user_text[:80])
full_response = ""
try:
stream = await self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=True,
temperature=0.7,
max_tokens=300,
)
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
full_response += delta.content
yield {"type": "token", "text": delta.content}
except Exception:
logger.exception("OpenRouter request failed")
yield {"type": "token", "text": "Sorry, I had trouble thinking about that."}
# Parse any JSON command block from the full response
command = self._extract_command(full_response)
if command:
logger.info("Parsed command: %s", command)
yield {"type": "command", "command": command}
# Clean spoken text (strip JSON block and thinking tags)
spoken_text = self._clean_spoken_text(full_response)
self._history.append({"role": "assistant", "content": full_response})
yield {"type": "done", "text": spoken_text}
def _extract_command(self, text: str) -> dict | None:
"""Extract the JSON command block from the LLM response."""
try:
# Find JSON code block
if "```json" in text:
start = text.index("```json") + 7
end = text.index("```", start)
json_str = text[start:end].strip()
return json.loads(json_str)
# Try bare JSON at the end
for i in range(len(text) - 1, -1, -1):
if text[i] == "{":
candidate = text[i:].strip()
return json.loads(candidate)
except (json.JSONDecodeError, ValueError):
pass
return None
@staticmethod
def _clean_spoken_text(text: str) -> str:
"""Remove JSON blocks and Qwen thinking tags from spoken text."""
import re
# Remove Qwen3 <think/> blocks
cleaned = re.sub(r"<think[^>]*>.*?</think\s*>", "", text, flags=re.DOTALL)
# Remove JSON code blocks
cleaned = re.sub(r"```json.*?```", "", cleaned, flags=re.DOTALL)
# Remove any trailing bare JSON object
cleaned = re.sub(r"\{[^}]*\"action\"[^}]*\}", "", cleaned)
# Clean up whitespace
cleaned = cleaned.strip().strip(".")
return cleaned

283
main.py Normal file
View File

@ -0,0 +1,283 @@
"""
main.py Echo Voice Assistant Orchestrator
Ties together all modules:
1. WakeWordListener (stt.py) continuously listens for "echo"
2. Transcriber (stt.py) captures & transcribes voice commands
3. Brain (brain.py) sends text to OpenRouter, streams response
4. TTSEngine (tts.py) generates speech from text (Qwen3-TTS)
5. Actions (actions.py) executes local OS commands
Phase 5 Parallel Processing:
As soon as the first complete sentence is received from the Brain's
streamed response, TTS generation begins immediately before the
full LLM response has finished streaming.
Usage:
python main.py
"""
import asyncio
import logging
import os
import re
import signal
import sys
from pathlib import Path
from dotenv import load_dotenv
from stt import WakeWordListener, Transcriber
from brain import Brain
from tts import TTSEngine
from actions import execute as execute_action
# ---------------------------------------------------------------------------
# Logging setup
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s%(name)-18s%(levelname)-7s%(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("echo")
# ---------------------------------------------------------------------------
# Load environment
# ---------------------------------------------------------------------------
load_dotenv(Path(__file__).parent / ".env")
class EchoAssistant:
"""
Main orchestrator for the Echo voice assistant.
Lifecycle:
1. Start wake word listener (background thread).
2. On wake word detected transcribe command.
3. Stream LLM response start TTS on first sentence (parallel).
4. Execute any local commands from the LLM response.
"""
def __init__(self):
# --- STT ---
model_path = os.environ.get(
"VOSK_MODEL_PATH", "models/vosk-model-small-en-us"
)
wake_word = os.environ.get("WAKE_WORD", "echo")
self.transcriber = Transcriber(model_path=model_path)
self.wake_listener = WakeWordListener(
wake_word=wake_word,
on_detected=self._on_wake_word,
)
# --- Brain (LLM) ---
self.brain = Brain(
api_key=os.environ.get("OPENROUTER_API_KEY"),
model=os.environ.get("OPENROUTER_MODEL", "qwen/qwen-3-235b-a22b"),
)
# --- TTS ---
self.tts = TTSEngine(
model_name=os.environ.get("QWEN_TTS_MODEL", "Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice"),
voice=os.environ.get("QWEN_TTS_VOICE", "Ryan"),
instruction=os.environ.get(
"QWEN_TTS_INSTRUCT",
"Speak clearly with a warm, friendly tone. Be natural and conversational.",
),
)
# --- State ---
self._processing = False # guard against concurrent commands
self._shutdown_event = asyncio.Event()
logger.info("Echo assistant initialized (wake word: '%s')", wake_word)
# ------------------------------------------------------------------
# Wake word callback (runs in background thread)
# ------------------------------------------------------------------
def _on_wake_word(self, wake_word: str):
"""Called by WakeWordListener when the wake word is detected."""
if self._processing:
logger.info("Still processing previous command — ignoring wake word")
return
# Schedule the command processing in the async event loop
try:
loop = asyncio.get_running_loop()
loop.call_soon_threadsafe(loop.create_task, self._handle_command())
except RuntimeError:
logger.warning("No running event loop for wake word callback")
# ------------------------------------------------------------------
# Main command pipeline
# ------------------------------------------------------------------
async def _handle_command(self):
"""Full pipeline: transcribe → think → speak → act."""
if self._processing:
return
self._processing = True
try:
# Play a brief acknowledgment tone
logger.info("🔊 Wake word detected — listening...")
# Step 1: Transcribe
text = self.transcriber.listen_and_transcribe()
if not text:
logger.info("No transcription — returning to idle")
return
logger.info("📝 You said: '%s'", text)
# Step 2: Stream LLM response with early TTS (Phase 5)
await self._stream_and_speak(text)
except Exception:
logger.exception("Error in command pipeline")
finally:
self._processing = False
logger.info("Returning to idle...")
# ------------------------------------------------------------------
# Phase 5: Parallel Streaming + TTS
# ------------------------------------------------------------------
async def _stream_and_speak(self, user_text: str):
"""
Stream the LLM response and start TTS generation as soon as the
first complete sentence is available minimizing perceived latency.
"""
buffer = ""
first_sentence_spoken = False
remaining_text = ""
pending_command = None
tts_tasks: list[asyncio.Task] = []
async for event in self.brain.think(user_text):
if event["type"] == "token":
buffer += event["text"]
# Check if we have a complete sentence
if not first_sentence_spoken and self._has_complete_sentence(buffer):
# Split: first sentence goes to TTS immediately
sentences = self._split_first_sentence(buffer)
first_sentence = sentences[0]
remaining_text = sentences[1] if len(sentences) > 1 else ""
if first_sentence.strip():
logger.info("⚡ Early TTS trigger: '%s'", first_sentence[:60])
task = asyncio.create_task(
self.tts.speak(first_sentence.strip())
)
tts_tasks.append(task)
first_sentence_spoken = True
buffer = remaining_text
elif event["type"] == "command":
pending_command = event["command"]
elif event["type"] == "done":
# Any remaining text after the first sentence
final_text = buffer.strip()
if final_text and final_text != remaining_text:
final_text = event["text"]
# Remove the already-spoken first sentence
if first_sentence_spoken and remaining_text:
pass # remaining_text already has what we need
else:
remaining_text = final_text
# Step 3: Speak the remaining text after first sentence finishes
remaining_text = remaining_text.strip()
if remaining_text:
# Wait for first sentence TTS to finish
for task in tts_tasks:
await task
await self.tts.speak(remaining_text)
# Wait for all TTS tasks to complete
for task in tts_tasks:
if not task.done():
await task
# Step 4: Execute any local command
if pending_command:
action_name = pending_command.get("action", "")
params = pending_command.get("params", {})
logger.info("🔧 Executing action: %s %s", action_name, params)
result = execute_action(action_name, params)
if result:
await self.tts.speak(result)
# ------------------------------------------------------------------
# Text utilities
# ------------------------------------------------------------------
@staticmethod
def _has_complete_sentence(text: str) -> bool:
"""Check if the text buffer contains at least one complete sentence."""
# A sentence is considered complete if it ends with . ! ? or ...
return bool(re.search(r'[.!?]\s+|[.!?]$', text))
@staticmethod
def _split_first_sentence(text: str) -> list[str]:
"""Split text at the first sentence boundary."""
match = re.search(r'([.!?])\s+', text)
if match:
end = match.start() + 1
return [text[:end], text[end:].strip()]
# Check for ending punctuation without trailing space
match = re.search(r'[.!?]$', text.strip())
if match:
return [text.strip()]
return [text]
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def start(self):
"""Start the Echo assistant."""
logger.info("=" * 60)
logger.info(" ECHO VOICE ASSISTANT")
logger.info(" Say '%s' to activate", os.environ.get("WAKE_WORD", "echo").upper())
logger.info(" Press Ctrl+C to quit")
logger.info("=" * 60)
# Start wake word listener (runs in background thread)
self.wake_listener.start()
# Keep the async loop alive until shutdown
await self._shutdown_event.wait()
def shutdown(self):
"""Signal the assistant to stop."""
logger.info("Shutting down Echo...")
self.wake_listener.stop()
self._shutdown_event.set()
# ---------------------------------------------------------------------------
# Entry Point
# ---------------------------------------------------------------------------
def main():
assistant = EchoAssistant()
# Graceful shutdown on Ctrl+C
def _signal_handler(sig, frame):
assistant.shutdown()
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
# Run the async event loop
try:
asyncio.run(assistant.start())
except KeyboardInterrupt:
pass
finally:
assistant.shutdown()
logger.info("Echo has shut down. Goodbye!")
if __name__ == "__main__":
main()

31
requirements.txt Normal file
View File

@ -0,0 +1,31 @@
# ===========================================================
# Echo Voice Assistant — Dependencies
# ===========================================================
# Install with: pip install -r requirements.txt
#
# Note: For GPU-accelerated TTS, install PyTorch with CUDA
# support first: https://pytorch.org/get-started/locally/
# ===========================================================
# --- Core ---
vosk>=0.3.45
pyaudio>=0.2.14
openwakeword>=0.5.0
# --- LLM ---
openai>=1.30.0
python-dotenv>=1.0.0
# --- TTS (Qwen3-TTS) ---
# Install from source or PyPI once available:
# pip install qwen-tts
torch>=2.1.0
soundfile>=0.12.1
transformers>=4.40.0
accelerate>=0.27.0
# --- Audio Playback ---
pygame>=2.5.0
# --- Utilities ---
numpy>=1.26.0

195
stt.py Normal file
View File

@ -0,0 +1,195 @@
"""
stt.py Speech-To-Text Module (Vosk + PyAudio + openWakeWord)
Responsibilities:
1. Continuously monitor the microphone for a wake word ("echo").
2. Once triggered, capture and transcribe the full spoken command.
3. Return the transcribed text to the orchestrator.
Environment Variables:
VOSK_MODEL_PATH path to a Vosk model directory (default: models/vosk-model-small-en-us)
WAKE_WORD wake phrase to listen for (default: "echo")
Dependencies:
pip install vosk pyaudio openwakeword
"""
import json
import logging
import queue
import threading
import time
from pathlib import Path
import openwakeword
import pyaudio
from vosk import KaldiRecognizer, Model, SetLogLevel
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DEFAULT_VOSK_MODEL = "models/vosk-model-small-en-us"
DEFAULT_WAKE_WORD = "echo"
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 1024
RECORD_SECONDS = 10 # max length of a voice command after wake word
SILENCE_LIMIT = 1.5 # seconds of silence before we stop recording
class WakeWordListener:
"""Background thread that listens for the wake word using openWakeWord."""
def __init__(self, wake_word: str = DEFAULT_WAKE_WORD, on_detected=None):
self.wake_word = wake_word.lower()
self.on_detected = on_detected # callback(wake_word)
self._running = False
self._thread: threading.Thread | None = None
self._audio_queue: queue.Queue = queue.Queue()
# ---- audio callback fed to PyAudio stream ----
def _audio_callback(self, in_data, frame_count, time_info, status):
self._audio_queue.put(in_data)
return (in_data, pyaudio.paContinue)
def start(self):
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._listen_loop, daemon=True)
self._thread.start()
logger.info("WakeWordListener started — listening for '%s'", self.wake_word)
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=5)
logger.info("WakeWordListener stopped")
def _listen_loop(self):
"""Open PyAudio, feed frames to openWakeWord, fire callback on match."""
oww = openwakeword.Model(
wakeword_models=[self.wake_word],
inference_framework="onnx",
)
pa = pyaudio.PyAudio()
stream = pa.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
stream_callback=self._audio_callback,
)
stream.start_stream()
try:
while self._running:
try:
frame = self._audio_queue.get(timeout=0.5)
except queue.Empty:
continue
# openWakeWord expects 16 kHz mono int16 — matches our format
prediction = oww.process(frame)
for model_name, score in prediction.items():
if score >= 0.5: # threshold
logger.info("Wake word '%s' detected (score=%.2f)", model_name, score)
if self.on_detected:
self.on_detected(self.wake_word)
finally:
stream.stop_stream()
stream.close()
pa.terminate()
class Transcriber:
"""Captures microphone audio after wake word and transcribes via Vosk."""
def __init__(self, model_path: str = DEFAULT_VOSK_MODEL):
resolved = Path(model_path)
if not resolved.exists():
raise FileNotFoundError(
f"Vosk model not found at {resolved.resolve()}. "
"Download one from https://alphacephei.com/vosk/models"
)
SetLogLevel(-1) # suppress Vosk internal noise
self._model = Model(str(resolved))
self._recognizer = KaldiRecognizer(self._model, RATE)
def listen_and_transcribe(self) -> str | None:
"""
Open the mic, record until silence or timeout, and return the
best-effort transcription of the spoken command.
Returns:
Transcribed text (str) or None if nothing was understood.
"""
pa = pyaudio.PyAudio()
stream = pa.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
)
stream.start_stream()
logger.info("Recording voice command...")
all_data = b""
silence_start: float | None = None
started_speaking = False
try:
while True:
data = stream.read(CHUNK, exception_on_overflow=False)
all_data += data
# Quick RMS check for silence detection
rms = self._rms(data)
if rms > 300:
started_speaking = True
silence_start = None
elif started_speaking and silence_start is None:
silence_start = time.time()
# Stop on silence timeout or max duration
if silence_start and (time.time() - silence_start) > SILENCE_LIMIT:
logger.debug("Silence detected — ending recording")
break
if len(all_data) > RATE * RECORD_SECONDS * 2: # bytes
logger.debug("Max recording duration reached")
break
finally:
stream.stop_stream()
stream.close()
pa.terminate()
if not started_speaking:
logger.info("No speech detected after wake word")
return None
# Feed all collected audio to Vosk for final transcription
self._recognizer.Reset()
if self._recognizer.AcceptWaveform(all_data):
result = json.loads(self._recognizer.Result())
text = result.get("text", "").strip()
else:
partial = json.loads(self._recognizer.PartialResult())
text = partial.get("partial", "").strip()
logger.info("Transcription: '%s'", text)
return text if text else None
@staticmethod
def _rms(data: bytes) -> float:
"""Compute Root Mean Square of a byte buffer of int16 samples."""
import array
samples = array.array("h", data)
if not samples:
return 0.0
sum_sq = sum(s * s for s in samples)
return (sum_sq / len(samples)) ** 0.5

181
tts.py Normal file
View File

@ -0,0 +1,181 @@
"""
tts.py Text-To-Speech Module (Qwen3-TTS)
Responsibilities:
1. Accept text (full or partial sentence) and generate a .wav audio file
using the Qwen3-TTS model running locally.
2. Support voice selection (preset voices or custom voice cloning).
3. Support instruction-based style control (e.g., energy, tone).
4. Play the generated audio immediately.
Environment Variables:
QWEN_TTS_MODEL model name or local path (default: Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice)
QWEN_TTS_VOICE preset voice name or path to 3s .wav sample
QWEN_TTS_INSTRUCT default style instruction for speech generation
Dependencies:
pip install qwen-tts torch soundfile pygame
"""
import asyncio
import logging
import os
import tempfile
from pathlib import Path
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
DEFAULT_MODEL = "Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice"
DEFAULT_VOICE = "Ryan" # preset voice; alternatives: "Serena", "Diana", etc.
DEFAULT_INSTRUCTION = "Speak clearly with a warm, friendly tone. Be natural and conversational."
OUTPUT_DIR = Path("audio_output")
class TTSEngine:
"""
Wrapper around Qwen3-TTS for generating speech from text.
The engine lazily loads the model on first use to avoid slow startup.
"""
def __init__(
self,
model_name: str = DEFAULT_MODEL,
voice: str = DEFAULT_VOICE,
instruction: str = DEFAULT_INSTRUCTION,
output_dir: str | Path = OUTPUT_DIR,
):
self.model_name = model_name
self.voice = voice
self.instruction = instruction
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self._model = None
self._processor = None
self._lock = asyncio.Lock() # prevent concurrent generation
# ---- lazy model loading ----
def _ensure_loaded(self):
"""Load model and processor on first call (lazy init)."""
if self._model is not None:
return
logger.info("Loading Qwen3-TTS model '%s' (this may take a moment)...", self.model_name)
try:
from qwen_tts import QwenTTSProcessor, QwenTTSModel
self._processor = QwenTTSProcessor()
self._model = QwenTTSModel.from_pretrained(self.model_name)
logger.info("Qwen3-TTS model loaded successfully")
except ImportError:
raise ImportError(
"qwen-tts is not installed. Install it with:\n"
" pip install qwen-tts torch soundfile\n"
"Also ensure you have CUDA-capable GPU for low-latency inference."
)
# ---- generation ----
async def generate(self, text: str, instruction: str | None = None) -> Path | None:
"""
Generate speech audio from text and save as .wav.
Args:
text: The text to convert to speech.
instruction: Optional style instruction override.
Returns:
Path to the generated .wav file, or None on failure.
"""
if not text or not text.strip():
return None
async with self._lock:
return await asyncio.to_thread(
self._generate_sync, text.strip(), instruction or self.instruction
)
def _generate_sync(self, text: str, instruction: str) -> Path | None:
"""Synchronous generation (runs in thread pool)."""
self._ensure_loaded()
output_path = self.output_dir / f"echo_{os.urandom(4).hex()}.wav"
try:
# Build voice reference: preset name or custom .wav path
voice_ref = self.voice
if Path(self.voice).exists():
voice_ref = str(Path(self.voice).resolve())
# Generate audio
logger.info("Generating speech: '%s' (voice=%s)", text[:60], self.voice)
audio_array = self._model.generate(
processor=self._processor,
text=text,
voice=voice_ref,
instruction=instruction,
)
# Save to file
import soundfile as sf
sample_rate = self._processor.sampling_rate
sf.write(str(output_path), audio_array, sample_rate)
logger.info("Audio saved to %s (%.1fs)", output_path, len(audio_array) / sample_rate)
return output_path
except Exception:
logger.exception("TTS generation failed for: '%s'", text[:60])
return None
# ---- playback ----
async def speak(self, text: str, instruction: str | None = None) -> bool:
"""
Generate speech from text and play it immediately.
Returns:
True if playback succeeded, False otherwise.
"""
wav_path = await self.generate(text, instruction)
if not wav_path:
return False
return await self._play(wav_path)
async def speak_file(self, wav_path: Path) -> bool:
"""Play a previously generated .wav file."""
return await self._play(wav_path)
@staticmethod
async def _play(wav_path: Path) -> bool:
"""Play a .wav file using pygame.mixer (async-friendly)."""
try:
import pygame
pygame.mixer.init(frequency=22050, size=-16, channels=1, buffer=2048)
pygame.mixer.music.load(str(wav_path))
pygame.mixer.music.play()
# Wait for playback to finish
while pygame.mixer.music.get_busy():
await asyncio.sleep(0.05)
pygame.mixer.music.stop()
pygame.mixer.quit()
logger.info("Playback finished: %s", wav_path.name)
return True
except Exception:
logger.exception("Playback failed for %s", wav_path)
return False
def set_voice(self, voice: str):
"""Switch to a different voice preset or custom sample path."""
self.voice = voice
logger.info("Voice set to: %s", voice)
def set_instruction(self, instruction: str):
"""Update the default style instruction."""
self.instruction = instruction
logger.info("TTS instruction updated: %s", instruction)