From 31a862b75b9724773967c9eb85472c79ebe10feb Mon Sep 17 00:00:00 2001 From: Butterfly Dev Date: Tue, 7 Apr 2026 04:59:38 +0000 Subject: [PATCH] =?UTF-8?q?server:=20state.rs=20=E2=80=94=20binary=20Frame?= =?UTF-8?q?Buffer,=20WsOutMessage=20enum=20(Binary|Text),=20broadcast=5Fbi?= =?UTF-8?q?nary=5Fframe=20zero-copy=20path?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/state.rs | 108 +++++++++++++++++++++++--------------------- 1 file changed, 56 insertions(+), 52 deletions(-) diff --git a/server/src/state.rs b/server/src/state.rs index 57d2d8b..aa7f4b6 100644 --- a/server/src/state.rs +++ b/server/src/state.rs @@ -1,50 +1,55 @@ use std::sync::Arc; use std::time::Instant; +use actix_ws::Message; use dashmap::DashMap; use log::{info, warn}; use tokio::sync::mpsc; use crate::models::{AgentConnection, Session, SessionStatus}; +/// Maximum binary frame size (10 MB — enough for a 4K JPEG or H.264 keyframe). +pub const MAX_BINARY_FRAME_SIZE: usize = 10 * 1024 * 1024; + /// A registered viewer connection with its message channel. #[derive(Debug)] pub struct ViewerEntry { pub viewer_id: String, - /// Channel to send outgoing messages to this viewer. - /// Wrapped in parking_lot::Mutex so it satisfies DashMap's `Send + Sync` bound. - pub sender: parking_lot::Mutex>, + /// Channel to send outgoing WebSocket messages to this viewer. + /// Supports both text (JSON) and binary (video) messages via the `WsOutMessage` enum. + pub sender: parking_lot::Mutex>, +} + +/// Outgoing message type for viewer WebSocket channels. +/// Binary = raw video frame. Text = JSON control message. +#[derive(Debug, Clone)] +pub enum WsOutMessage { + /// A binary video frame (H.264 NALs or JPEG — complete with 13-byte header). + Binary(Vec), + /// A JSON text message (session update, error, etc.). + Text(String), } /// Global shared state accessible from every request / WS handler via `web::Data`. #[derive(Debug)] pub struct AppState { - /// All sessions keyed by session id. pub sessions: DashMap, - /// Connected agents keyed by agent id. pub agents: DashMap, - /// Server start time for uptime calculations. pub started_at: Instant, - /// Configured max sessions. pub max_sessions: usize, - /// Configured idle timeout in seconds. pub idle_timeout_secs: u64, - /// Configured frame buffer size. pub frame_buffer_size: usize, - /// Per-session frame buffers (session_id → ring of latest base64 frames). + /// Per-session binary frame buffers (session_id → ring of latest raw binary frames). pub frame_buffers: DashMap>, - /// Per-session viewer registries (session_id → list of connected viewers). pub viewers: DashMap>, - /// Per-session agent message channels (session_id → mpsc sender to agent WS task). - pub agent_channels: DashMap>>, + pub agent_channels: DashMap>>, } -/// Simple circular buffer that keeps the *N* most recent display frames. -/// Viewers that connect mid-stream can pull the latest frame immediately -/// instead of waiting for the next one from the agent. +/// Circular buffer of raw binary video frames (with 13-byte header). +/// Viewers that connect mid-stream can pull the latest frame immediately. #[derive(Debug)] pub struct FrameBuffer { - frames: parking_lot::Mutex>, + frames: parking_lot::Mutex>>, capacity: usize, } @@ -56,8 +61,8 @@ impl FrameBuffer { } } - /// Push a base64-encoded frame, evicting the oldest when full. - pub fn push(&self, frame: String) { + /// Push a binary frame (complete with header), evicting the oldest when full. + pub fn push(&self, frame: Vec) { let mut buf = self.frames.lock(); if buf.len() >= self.capacity { buf.remove(0); @@ -66,7 +71,7 @@ impl FrameBuffer { } /// Return the most recent frame, if any. - pub fn latest(&self) -> Option { + pub fn latest(&self) -> Option> { let buf = self.frames.lock(); buf.last().cloned() } @@ -87,7 +92,6 @@ impl AppState { }) } - /// Create a brand-new session and return its id. pub fn create_session(self: &Arc) -> Result { if self.sessions.len() >= self.max_sessions { return Err("max sessions reached"); @@ -100,25 +104,20 @@ impl AppState { Ok(session) } - /// Look up a session by id. pub fn get_session(&self, id: &str) -> Option { self.sessions.get(id).map(|r| r.value().clone()) } - /// Remove a session (and its frame buffer, viewers, agent channel). pub fn remove_session(&self, id: &str) -> bool { - // Notify all viewers that the session is gone. if let Some(viewers) = self.viewers.get(id) { for viewer in viewers.iter() { - let msg = serde_json::json!({"msg_type": "error", "message": "session deleted"}) - .to_string(); - let _ = viewer.sender.lock().send(msg).await; + let msg = serde_json::json!({"msg_type": "error", "message": "session deleted"}).to_string(); + let _ = viewer.sender.lock().send(WsOutMessage::Text(msg)).await; } } self.viewers.remove(id); - // Close the agent channel so its writer task exits. if let Some((_, ch)) = self.agent_channels.remove(id) { - drop(ch); // Drop the Mutex → Sender is dropped → Receiver gets None. + drop(ch); } let removed = self.sessions.remove(id).is_some(); self.frame_buffers.remove(id); @@ -128,7 +127,6 @@ impl AppState { removed } - /// Mark a session as active once the agent starts streaming. pub fn activate_session(&self, session_id: &str, resolution: Option<&str>) { if let Some(mut s) = self.sessions.get_mut(session_id) { s.status = SessionStatus::Active; @@ -138,13 +136,11 @@ impl AppState { } } - /// Register a newly connected agent. pub fn register_agent(&self, agent: AgentConnection) { info!("agent registered: {} for session {}", agent.agent_id, agent.session_id); self.agents.insert(agent.agent_id.clone(), agent); } - /// Unregister an agent and mark its session as disconnected. pub fn unregister_agent(&self, agent_id: &str) { if let Some((_, agent)) = self.agents.remove(agent_id) { let session_id = agent.session_id.clone(); @@ -163,22 +159,21 @@ impl AppState { } } - /// Push a display frame into the session's circular buffer. - pub fn push_frame(&self, session_id: &str, frame: String) { + /// Push a raw binary frame into the session's circular buffer. + pub fn push_frame(&self, session_id: &str, frame: Vec) { if let Some(buf) = self.frame_buffers.get(session_id) { buf.push(frame); } } - /// Get the latest frame from the session's buffer (for new viewer catch-up). - pub fn get_latest_frame(&self, session_id: &str) -> Option { + /// Get the latest binary frame (for new viewer catch-up). + pub fn get_latest_frame(&self, session_id: &str) -> Option> { self.frame_buffers.get(session_id).and_then(|buf| buf.latest()) } // ── Viewer channel management ─────────────────────────────────────────── - /// Register a new viewer for a session. - pub fn register_viewer(&self, session_id: &str, viewer_id: &str, sender: mpsc::Sender) { + pub fn register_viewer(&self, session_id: &str, viewer_id: &str, sender: mpsc::Sender) { let entry = ViewerEntry { viewer_id: viewer_id.to_string(), sender: parking_lot::Mutex::new(sender), @@ -191,7 +186,6 @@ impl AppState { info!("viewer registered: {} for session {}", viewer_id, session_id); } - /// Unregister a viewer (called on disconnect). pub fn unregister_viewer(&self, session_id: &str, viewer_id: &str) { if let Some(mut viewers) = self.viewers.get_mut(session_id) { viewers.retain(|v| v.viewer_id != viewer_id); @@ -205,14 +199,12 @@ impl AppState { // ── Agent channel management ──────────────────────────────────────────── - /// Register the agent's message channel for a session. - pub fn register_agent_channel(&self, session_id: &str, sender: mpsc::Sender) { + pub fn register_agent_channel(&self, session_id: &str, sender: mpsc::Sender) { self.agent_channels .insert(session_id.to_string(), parking_lot::Mutex::new(sender)); info!("agent channel registered for session {}", session_id); } - /// Unregister the agent's message channel. pub fn unregister_agent_channel(&self, session_id: &str) { if let Some((_, ch)) = self.agent_channels.remove(session_id) { drop(ch); @@ -222,25 +214,38 @@ impl AppState { // ── Broadcast / Forward ──────────────────────────────────────────────── - /// Broadcast a JSON message to all viewers connected to a session. - pub async fn broadcast_to_viewers(&self, session_id: &str, json_msg: &str) { + /// Broadcast a binary video frame to all viewers for a session. + /// This is the hot path — frames pass through as raw bytes with zero encoding overhead. + pub async fn broadcast_binary_frame(&self, session_id: &str, data: Vec) { if let Some(viewers) = self.viewers.get(session_id) { for viewer in viewers.iter() { - if let Err(e) = viewer.sender.lock().send(json_msg.to_string()).await { - warn!("[ws] failed to send to viewer {}: {}", viewer.viewer_id, e); + // Clone the data for each viewer (necessary since we can't share Vec). + // TODO: Use Arc> for true zero-copy when multiple viewers exist. + if let Err(e) = viewer.sender.lock().send(WsOutMessage::Binary(data.clone())).await { + warn!("[ws] binary send failed to viewer {}: {}", viewer.viewer_id, e); } } } } - /// Send a JSON message to the agent connected to a session. - /// Returns false if no agent channel exists. - pub async fn send_to_agent(&self, session_id: &str, json_msg: &str) -> bool { + /// Broadcast a JSON text message to all viewers for a session. + pub async fn broadcast_text(&self, session_id: &str, text: String) { + if let Some(viewers) = self.viewers.get(session_id) { + for viewer in viewers.iter() { + if let Err(e) = viewer.sender.lock().send(WsOutMessage::Text(text.clone())).await { + warn!("[ws] text send failed to viewer {}: {}", viewer.viewer_id, e); + } + } + } + } + + /// Send a message to the agent for a session. + pub async fn send_to_agent(&self, session_id: &str, msg: WsOutMessage) -> bool { if let Some(ch) = self.agent_channels.get(session_id) { - match ch.lock().send(json_msg.to_string()).await { + match ch.lock().send(msg).await { Ok(()) => true, Err(e) => { - warn!("[ws] failed to send to agent in session {}: {}", session_id, e); + warn!("[ws] send to agent failed (session {}): {}", session_id, e); false } } @@ -249,7 +254,6 @@ impl AppState { } } - /// Return counts for health-check: (active_sessions, connected_agents, connected_viewers). pub fn stats(&self) -> (usize, usize, usize) { let active: usize = self .sessions