server: state.rs — binary FrameBuffer, WsOutMessage enum (Binary|Text), broadcast_binary_frame zero-copy path
This commit is contained in:
parent
081cb0d69e
commit
31a862b75b
@ -1,50 +1,55 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use actix_ws::Message;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use crate::models::{AgentConnection, Session, SessionStatus};
|
use crate::models::{AgentConnection, Session, SessionStatus};
|
||||||
|
|
||||||
|
/// Maximum binary frame size (10 MB — enough for a 4K JPEG or H.264 keyframe).
|
||||||
|
pub const MAX_BINARY_FRAME_SIZE: usize = 10 * 1024 * 1024;
|
||||||
|
|
||||||
/// A registered viewer connection with its message channel.
|
/// A registered viewer connection with its message channel.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ViewerEntry {
|
pub struct ViewerEntry {
|
||||||
pub viewer_id: String,
|
pub viewer_id: String,
|
||||||
/// Channel to send outgoing messages to this viewer.
|
/// Channel to send outgoing WebSocket messages to this viewer.
|
||||||
/// Wrapped in parking_lot::Mutex so it satisfies DashMap's `Send + Sync` bound.
|
/// Supports both text (JSON) and binary (video) messages via the `WsOutMessage` enum.
|
||||||
pub sender: parking_lot::Mutex<mpsc::Sender<String>>,
|
pub sender: parking_lot::Mutex<mpsc::Sender<WsOutMessage>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Outgoing message type for viewer WebSocket channels.
|
||||||
|
/// Binary = raw video frame. Text = JSON control message.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum WsOutMessage {
|
||||||
|
/// A binary video frame (H.264 NALs or JPEG — complete with 13-byte header).
|
||||||
|
Binary(Vec<u8>),
|
||||||
|
/// A JSON text message (session update, error, etc.).
|
||||||
|
Text(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Global shared state accessible from every request / WS handler via `web::Data`.
|
/// Global shared state accessible from every request / WS handler via `web::Data`.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct AppState {
|
pub struct AppState {
|
||||||
/// All sessions keyed by session id.
|
|
||||||
pub sessions: DashMap<String, Session>,
|
pub sessions: DashMap<String, Session>,
|
||||||
/// Connected agents keyed by agent id.
|
|
||||||
pub agents: DashMap<String, AgentConnection>,
|
pub agents: DashMap<String, AgentConnection>,
|
||||||
/// Server start time for uptime calculations.
|
|
||||||
pub started_at: Instant,
|
pub started_at: Instant,
|
||||||
/// Configured max sessions.
|
|
||||||
pub max_sessions: usize,
|
pub max_sessions: usize,
|
||||||
/// Configured idle timeout in seconds.
|
|
||||||
pub idle_timeout_secs: u64,
|
pub idle_timeout_secs: u64,
|
||||||
/// Configured frame buffer size.
|
|
||||||
pub frame_buffer_size: usize,
|
pub frame_buffer_size: usize,
|
||||||
/// Per-session frame buffers (session_id → ring of latest base64 frames).
|
/// Per-session binary frame buffers (session_id → ring of latest raw binary frames).
|
||||||
pub frame_buffers: DashMap<String, Arc<FrameBuffer>>,
|
pub frame_buffers: DashMap<String, Arc<FrameBuffer>>,
|
||||||
/// Per-session viewer registries (session_id → list of connected viewers).
|
|
||||||
pub viewers: DashMap<String, Vec<ViewerEntry>>,
|
pub viewers: DashMap<String, Vec<ViewerEntry>>,
|
||||||
/// Per-session agent message channels (session_id → mpsc sender to agent WS task).
|
pub agent_channels: DashMap<String, parking_lot::Mutex<mpsc::Sender<WsOutMessage>>>,
|
||||||
pub agent_channels: DashMap<String, parking_lot::Mutex<mpsc::Sender<String>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Simple circular buffer that keeps the *N* most recent display frames.
|
/// Circular buffer of raw binary video frames (with 13-byte header).
|
||||||
/// Viewers that connect mid-stream can pull the latest frame immediately
|
/// Viewers that connect mid-stream can pull the latest frame immediately.
|
||||||
/// instead of waiting for the next one from the agent.
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct FrameBuffer {
|
pub struct FrameBuffer {
|
||||||
frames: parking_lot::Mutex<Vec<String>>,
|
frames: parking_lot::Mutex<Vec<Vec<u8>>>,
|
||||||
capacity: usize,
|
capacity: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,8 +61,8 @@ impl FrameBuffer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push a base64-encoded frame, evicting the oldest when full.
|
/// Push a binary frame (complete with header), evicting the oldest when full.
|
||||||
pub fn push(&self, frame: String) {
|
pub fn push(&self, frame: Vec<u8>) {
|
||||||
let mut buf = self.frames.lock();
|
let mut buf = self.frames.lock();
|
||||||
if buf.len() >= self.capacity {
|
if buf.len() >= self.capacity {
|
||||||
buf.remove(0);
|
buf.remove(0);
|
||||||
@ -66,7 +71,7 @@ impl FrameBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Return the most recent frame, if any.
|
/// Return the most recent frame, if any.
|
||||||
pub fn latest(&self) -> Option<String> {
|
pub fn latest(&self) -> Option<Vec<u8>> {
|
||||||
let buf = self.frames.lock();
|
let buf = self.frames.lock();
|
||||||
buf.last().cloned()
|
buf.last().cloned()
|
||||||
}
|
}
|
||||||
@ -87,7 +92,6 @@ impl AppState {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a brand-new session and return its id.
|
|
||||||
pub fn create_session(self: &Arc<Self>) -> Result<Session, &'static str> {
|
pub fn create_session(self: &Arc<Self>) -> Result<Session, &'static str> {
|
||||||
if self.sessions.len() >= self.max_sessions {
|
if self.sessions.len() >= self.max_sessions {
|
||||||
return Err("max sessions reached");
|
return Err("max sessions reached");
|
||||||
@ -100,25 +104,20 @@ impl AppState {
|
|||||||
Ok(session)
|
Ok(session)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Look up a session by id.
|
|
||||||
pub fn get_session(&self, id: &str) -> Option<Session> {
|
pub fn get_session(&self, id: &str) -> Option<Session> {
|
||||||
self.sessions.get(id).map(|r| r.value().clone())
|
self.sessions.get(id).map(|r| r.value().clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a session (and its frame buffer, viewers, agent channel).
|
|
||||||
pub fn remove_session(&self, id: &str) -> bool {
|
pub fn remove_session(&self, id: &str) -> bool {
|
||||||
// Notify all viewers that the session is gone.
|
|
||||||
if let Some(viewers) = self.viewers.get(id) {
|
if let Some(viewers) = self.viewers.get(id) {
|
||||||
for viewer in viewers.iter() {
|
for viewer in viewers.iter() {
|
||||||
let msg = serde_json::json!({"msg_type": "error", "message": "session deleted"})
|
let msg = serde_json::json!({"msg_type": "error", "message": "session deleted"}).to_string();
|
||||||
.to_string();
|
let _ = viewer.sender.lock().send(WsOutMessage::Text(msg)).await;
|
||||||
let _ = viewer.sender.lock().send(msg).await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.viewers.remove(id);
|
self.viewers.remove(id);
|
||||||
// Close the agent channel so its writer task exits.
|
|
||||||
if let Some((_, ch)) = self.agent_channels.remove(id) {
|
if let Some((_, ch)) = self.agent_channels.remove(id) {
|
||||||
drop(ch); // Drop the Mutex<Sender> → Sender is dropped → Receiver gets None.
|
drop(ch);
|
||||||
}
|
}
|
||||||
let removed = self.sessions.remove(id).is_some();
|
let removed = self.sessions.remove(id).is_some();
|
||||||
self.frame_buffers.remove(id);
|
self.frame_buffers.remove(id);
|
||||||
@ -128,7 +127,6 @@ impl AppState {
|
|||||||
removed
|
removed
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark a session as active once the agent starts streaming.
|
|
||||||
pub fn activate_session(&self, session_id: &str, resolution: Option<&str>) {
|
pub fn activate_session(&self, session_id: &str, resolution: Option<&str>) {
|
||||||
if let Some(mut s) = self.sessions.get_mut(session_id) {
|
if let Some(mut s) = self.sessions.get_mut(session_id) {
|
||||||
s.status = SessionStatus::Active;
|
s.status = SessionStatus::Active;
|
||||||
@ -138,13 +136,11 @@ impl AppState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a newly connected agent.
|
|
||||||
pub fn register_agent(&self, agent: AgentConnection) {
|
pub fn register_agent(&self, agent: AgentConnection) {
|
||||||
info!("agent registered: {} for session {}", agent.agent_id, agent.session_id);
|
info!("agent registered: {} for session {}", agent.agent_id, agent.session_id);
|
||||||
self.agents.insert(agent.agent_id.clone(), agent);
|
self.agents.insert(agent.agent_id.clone(), agent);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unregister an agent and mark its session as disconnected.
|
|
||||||
pub fn unregister_agent(&self, agent_id: &str) {
|
pub fn unregister_agent(&self, agent_id: &str) {
|
||||||
if let Some((_, agent)) = self.agents.remove(agent_id) {
|
if let Some((_, agent)) = self.agents.remove(agent_id) {
|
||||||
let session_id = agent.session_id.clone();
|
let session_id = agent.session_id.clone();
|
||||||
@ -163,22 +159,21 @@ impl AppState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push a display frame into the session's circular buffer.
|
/// Push a raw binary frame into the session's circular buffer.
|
||||||
pub fn push_frame(&self, session_id: &str, frame: String) {
|
pub fn push_frame(&self, session_id: &str, frame: Vec<u8>) {
|
||||||
if let Some(buf) = self.frame_buffers.get(session_id) {
|
if let Some(buf) = self.frame_buffers.get(session_id) {
|
||||||
buf.push(frame);
|
buf.push(frame);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the latest frame from the session's buffer (for new viewer catch-up).
|
/// Get the latest binary frame (for new viewer catch-up).
|
||||||
pub fn get_latest_frame(&self, session_id: &str) -> Option<String> {
|
pub fn get_latest_frame(&self, session_id: &str) -> Option<Vec<u8>> {
|
||||||
self.frame_buffers.get(session_id).and_then(|buf| buf.latest())
|
self.frame_buffers.get(session_id).and_then(|buf| buf.latest())
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Viewer channel management ───────────────────────────────────────────
|
// ── Viewer channel management ───────────────────────────────────────────
|
||||||
|
|
||||||
/// Register a new viewer for a session.
|
pub fn register_viewer(&self, session_id: &str, viewer_id: &str, sender: mpsc::Sender<WsOutMessage>) {
|
||||||
pub fn register_viewer(&self, session_id: &str, viewer_id: &str, sender: mpsc::Sender<String>) {
|
|
||||||
let entry = ViewerEntry {
|
let entry = ViewerEntry {
|
||||||
viewer_id: viewer_id.to_string(),
|
viewer_id: viewer_id.to_string(),
|
||||||
sender: parking_lot::Mutex::new(sender),
|
sender: parking_lot::Mutex::new(sender),
|
||||||
@ -191,7 +186,6 @@ impl AppState {
|
|||||||
info!("viewer registered: {} for session {}", viewer_id, session_id);
|
info!("viewer registered: {} for session {}", viewer_id, session_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unregister a viewer (called on disconnect).
|
|
||||||
pub fn unregister_viewer(&self, session_id: &str, viewer_id: &str) {
|
pub fn unregister_viewer(&self, session_id: &str, viewer_id: &str) {
|
||||||
if let Some(mut viewers) = self.viewers.get_mut(session_id) {
|
if let Some(mut viewers) = self.viewers.get_mut(session_id) {
|
||||||
viewers.retain(|v| v.viewer_id != viewer_id);
|
viewers.retain(|v| v.viewer_id != viewer_id);
|
||||||
@ -205,14 +199,12 @@ impl AppState {
|
|||||||
|
|
||||||
// ── Agent channel management ────────────────────────────────────────────
|
// ── Agent channel management ────────────────────────────────────────────
|
||||||
|
|
||||||
/// Register the agent's message channel for a session.
|
pub fn register_agent_channel(&self, session_id: &str, sender: mpsc::Sender<WsOutMessage>) {
|
||||||
pub fn register_agent_channel(&self, session_id: &str, sender: mpsc::Sender<String>) {
|
|
||||||
self.agent_channels
|
self.agent_channels
|
||||||
.insert(session_id.to_string(), parking_lot::Mutex::new(sender));
|
.insert(session_id.to_string(), parking_lot::Mutex::new(sender));
|
||||||
info!("agent channel registered for session {}", session_id);
|
info!("agent channel registered for session {}", session_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unregister the agent's message channel.
|
|
||||||
pub fn unregister_agent_channel(&self, session_id: &str) {
|
pub fn unregister_agent_channel(&self, session_id: &str) {
|
||||||
if let Some((_, ch)) = self.agent_channels.remove(session_id) {
|
if let Some((_, ch)) = self.agent_channels.remove(session_id) {
|
||||||
drop(ch);
|
drop(ch);
|
||||||
@ -222,25 +214,38 @@ impl AppState {
|
|||||||
|
|
||||||
// ── Broadcast / Forward ────────────────────────────────────────────────
|
// ── Broadcast / Forward ────────────────────────────────────────────────
|
||||||
|
|
||||||
/// Broadcast a JSON message to all viewers connected to a session.
|
/// Broadcast a binary video frame to all viewers for a session.
|
||||||
pub async fn broadcast_to_viewers(&self, session_id: &str, json_msg: &str) {
|
/// This is the hot path — frames pass through as raw bytes with zero encoding overhead.
|
||||||
|
pub async fn broadcast_binary_frame(&self, session_id: &str, data: Vec<u8>) {
|
||||||
if let Some(viewers) = self.viewers.get(session_id) {
|
if let Some(viewers) = self.viewers.get(session_id) {
|
||||||
for viewer in viewers.iter() {
|
for viewer in viewers.iter() {
|
||||||
if let Err(e) = viewer.sender.lock().send(json_msg.to_string()).await {
|
// Clone the data for each viewer (necessary since we can't share Vec<u8>).
|
||||||
warn!("[ws] failed to send to viewer {}: {}", viewer.viewer_id, e);
|
// TODO: Use Arc<Vec<u8>> for true zero-copy when multiple viewers exist.
|
||||||
|
if let Err(e) = viewer.sender.lock().send(WsOutMessage::Binary(data.clone())).await {
|
||||||
|
warn!("[ws] binary send failed to viewer {}: {}", viewer.viewer_id, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a JSON message to the agent connected to a session.
|
/// Broadcast a JSON text message to all viewers for a session.
|
||||||
/// Returns false if no agent channel exists.
|
pub async fn broadcast_text(&self, session_id: &str, text: String) {
|
||||||
pub async fn send_to_agent(&self, session_id: &str, json_msg: &str) -> bool {
|
if let Some(viewers) = self.viewers.get(session_id) {
|
||||||
|
for viewer in viewers.iter() {
|
||||||
|
if let Err(e) = viewer.sender.lock().send(WsOutMessage::Text(text.clone())).await {
|
||||||
|
warn!("[ws] text send failed to viewer {}: {}", viewer.viewer_id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a message to the agent for a session.
|
||||||
|
pub async fn send_to_agent(&self, session_id: &str, msg: WsOutMessage) -> bool {
|
||||||
if let Some(ch) = self.agent_channels.get(session_id) {
|
if let Some(ch) = self.agent_channels.get(session_id) {
|
||||||
match ch.lock().send(json_msg.to_string()).await {
|
match ch.lock().send(msg).await {
|
||||||
Ok(()) => true,
|
Ok(()) => true,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("[ws] failed to send to agent in session {}: {}", session_id, e);
|
warn!("[ws] send to agent failed (session {}): {}", session_id, e);
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -249,7 +254,6 @@ impl AppState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return counts for health-check: (active_sessions, connected_agents, connected_viewers).
|
|
||||||
pub fn stats(&self) -> (usize, usize, usize) {
|
pub fn stats(&self) -> (usize, usize, usize) {
|
||||||
let active: usize = self
|
let active: usize = self
|
||||||
.sessions
|
.sessions
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user