""" stt.py — Speech-To-Text Module (Vosk + PyAudio + openWakeWord) Responsibilities: 1. Continuously monitor the microphone for a wake word ("echo"). 2. Once triggered, capture and transcribe the full spoken command. 3. Return the transcribed text to the orchestrator. Environment Variables: VOSK_MODEL_PATH — path to a Vosk model directory (default: models/vosk-model-small-en-us) WAKE_WORD — wake phrase to listen for (default: "echo") Dependencies: pip install vosk pyaudio openwakeword """ import json import logging import queue import threading import time from pathlib import Path import openwakeword import pyaudio from vosk import KaldiRecognizer, Model, SetLogLevel logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- DEFAULT_VOSK_MODEL = "models/vosk-model-small-en-us" DEFAULT_WAKE_WORD = "echo" FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 CHUNK = 1024 RECORD_SECONDS = 10 # max length of a voice command after wake word SILENCE_LIMIT = 1.5 # seconds of silence before we stop recording class WakeWordListener: """Background thread that listens for the wake word using openWakeWord.""" def __init__(self, wake_word: str = DEFAULT_WAKE_WORD, on_detected=None): self.wake_word = wake_word.lower() self.on_detected = on_detected # callback(wake_word) self._running = False self._thread: threading.Thread | None = None self._audio_queue: queue.Queue = queue.Queue() # ---- audio callback fed to PyAudio stream ---- def _audio_callback(self, in_data, frame_count, time_info, status): self._audio_queue.put(in_data) return (in_data, pyaudio.paContinue) def start(self): if self._running: return self._running = True self._thread = threading.Thread(target=self._listen_loop, daemon=True) self._thread.start() logger.info("WakeWordListener started — listening for '%s'", self.wake_word) def stop(self): self._running = False if self._thread: self._thread.join(timeout=5) logger.info("WakeWordListener stopped") def _listen_loop(self): """Open PyAudio, feed frames to openWakeWord, fire callback on match.""" oww = openwakeword.Model( wakeword_models=[self.wake_word], inference_framework="onnx", ) pa = pyaudio.PyAudio() stream = pa.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK, stream_callback=self._audio_callback, ) stream.start_stream() try: while self._running: try: frame = self._audio_queue.get(timeout=0.5) except queue.Empty: continue # openWakeWord expects 16 kHz mono int16 — matches our format prediction = oww.process(frame) for model_name, score in prediction.items(): if score >= 0.5: # threshold logger.info("Wake word '%s' detected (score=%.2f)", model_name, score) if self.on_detected: self.on_detected(self.wake_word) finally: stream.stop_stream() stream.close() pa.terminate() class Transcriber: """Captures microphone audio after wake word and transcribes via Vosk.""" def __init__(self, model_path: str = DEFAULT_VOSK_MODEL): resolved = Path(model_path) if not resolved.exists(): raise FileNotFoundError( f"Vosk model not found at {resolved.resolve()}. " "Download one from https://alphacephei.com/vosk/models" ) SetLogLevel(-1) # suppress Vosk internal noise self._model = Model(str(resolved)) self._recognizer = KaldiRecognizer(self._model, RATE) def listen_and_transcribe(self) -> str | None: """ Open the mic, record until silence or timeout, and return the best-effort transcription of the spoken command. Returns: Transcribed text (str) or None if nothing was understood. """ pa = pyaudio.PyAudio() stream = pa.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK, ) stream.start_stream() logger.info("Recording voice command...") all_data = b"" silence_start: float | None = None started_speaking = False try: while True: data = stream.read(CHUNK, exception_on_overflow=False) all_data += data # Quick RMS check for silence detection rms = self._rms(data) if rms > 300: started_speaking = True silence_start = None elif started_speaking and silence_start is None: silence_start = time.time() # Stop on silence timeout or max duration if silence_start and (time.time() - silence_start) > SILENCE_LIMIT: logger.debug("Silence detected — ending recording") break if len(all_data) > RATE * RECORD_SECONDS * 2: # bytes logger.debug("Max recording duration reached") break finally: stream.stop_stream() stream.close() pa.terminate() if not started_speaking: logger.info("No speech detected after wake word") return None # Feed all collected audio to Vosk for final transcription self._recognizer.Reset() if self._recognizer.AcceptWaveform(all_data): result = json.loads(self._recognizer.Result()) text = result.get("text", "").strip() else: partial = json.loads(self._recognizer.PartialResult()) text = partial.get("partial", "").strip() logger.info("Transcription: '%s'", text) return text if text else None @staticmethod def _rms(data: bytes) -> float: """Compute Root Mean Square of a byte buffer of int16 samples.""" import array samples = array.array("h", data) if not samples: return 0.0 sum_sq = sum(s * s for s in samples) return (sum_sq / len(samples)) ** 0.5