From ff476f12dc28d0eabff58f24217f6b75022a974f Mon Sep 17 00:00:00 2001 From: Butterfly Dev Date: Tue, 7 Apr 2026 03:08:36 +0000 Subject: [PATCH] =?UTF-8?q?server:=20state.rs=20=E2=80=94=20AppState=20wit?= =?UTF-8?q?h=20DashMap=20sessions/agents,=20FrameBuffer=20ring=20for=20lat?= =?UTF-8?q?e-joiners?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/state.rs | 153 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 server/src/state.rs diff --git a/server/src/state.rs b/server/src/state.rs new file mode 100644 index 0000000..1574990 --- /dev/null +++ b/server/src/state.rs @@ -0,0 +1,153 @@ +use std::sync::Arc; +use std::time::Instant; + +use dashmap::DashMap; +use log::info; + +use crate::models::{AgentConnection, Session, SessionStatus}; + +/// Global shared state accessible from every request / WS handler via `web::Data`. +#[derive(Debug)] +pub struct AppState { + /// All sessions keyed by session id. + pub sessions: DashMap, + /// Connected agents keyed by agent id. + pub agents: DashMap, + /// Server start time for uptime calculations. + pub started_at: Instant, + /// Configured max sessions. + pub max_sessions: usize, + /// Configured idle timeout in seconds. + pub idle_timeout_secs: u64, + /// Configured frame buffer size. + pub frame_buffer_size: usize, + /// Per-session frame buffers (session_id → ring of latest base64 frames). + pub frame_buffers: DashMap>, +} + +/// Simple circular buffer that keeps the *N* most recent display frames. +/// Viewers that connect mid-stream can pull the latest frame immediately +/// instead of waiting for the next one from the agent. +pub struct FrameBuffer { + frames: parking_lot::Mutex>, + capacity: usize, +} + +impl FrameBuffer { + pub fn new(capacity: usize) -> Self { + Self { + frames: parking_lot::Mutex::new(Vec::with_capacity(capacity)), + capacity, + } + } + + /// Push a base64-encoded frame, evicting the oldest when full. + pub fn push(&self, frame: String) { + let mut buf = self.frames.lock(); + if buf.len() >= self.capacity { + buf.remove(0); + } + buf.push(frame); + } + + /// Return the most recent frame, if any. + pub fn latest(&self) -> Option { + let buf = self.frames.lock(); + buf.last().cloned() + } +} + +impl AppState { + pub fn new(max_sessions: usize, idle_timeout_secs: u64, frame_buffer_size: usize) -> Arc { + Arc::new(Self { + sessions: DashMap::new(), + agents: DashMap::new(), + started_at: Instant::now(), + max_sessions, + idle_timeout_secs, + frame_buffer_size, + frame_buffers: DashMap::new(), + }) + } + + /// Create a brand-new session and return its id. + pub fn create_session(self: &Arc) -> Result { + if self.sessions.len() >= self.max_sessions { + return Err("max sessions reached"); + } + let id = uuid::Uuid::new_v4().to_string(); + let session = Session::new(id.clone()); + self.frame_buffers.insert(id.clone(), Arc::new(FrameBuffer::new(self.frame_buffer_size))); + self.sessions.insert(id.clone(), session.clone()); + info!("session created: {}", id); + Ok(session) + } + + /// Look up a session by id. + pub fn get_session(&self, id: &str) -> Option { + self.sessions.get(id).map(|r| r.value().clone()) + } + + /// Remove a session (and its frame buffer). + pub fn remove_session(&self, id: &str) -> bool { + let removed = self.sessions.remove(id).is_some(); + self.frame_buffers.remove(id); + if removed { + info!("session removed: {}", id); + } + removed + } + + /// Mark a session as active once the agent starts streaming. + pub fn activate_session(&self, session_id: &str, resolution: Option<&str>) { + if let Some(mut s) = self.sessions.get_mut(session_id) { + s.status = SessionStatus::Active; + s.display_active = true; + s.resolution = resolution.map(String::from); + info!("session activated: {}", session_id); + } + } + + /// Register a newly connected agent. + pub fn register_agent(&self, agent: AgentConnection) { + info!("agent registered: {} for session {}", agent.agent_id, agent.session_id); + self.agents.insert(agent.agent_id.clone(), agent); + } + + /// Unregister an agent and mark its session as disconnected. + pub fn unregister_agent(&self, agent_id: &str) { + if let Some((_, agent)) = self.agents.remove(agent_id) { + // Mark the session as disconnected if no other agents are connected for it. + let session_id = agent.session_id.clone(); + let still_has_agent = self + .agents + .iter() + .any(|r| r.session_id == session_id && r.agent_id != agent_id); + if !still_has_agent { + if let Some(mut s) = self.sessions.get_mut(&session_id) { + s.status = SessionStatus::Disconnected; + s.display_active = false; + s.audio_active = false; + } + } + info!("agent unregistered: {} (session {})", agent_id, session_id); + } + } + + /// Push a display frame into the session's circular buffer. + pub fn push_frame(&self, session_id: &str, frame: String) { + if let Some(buf) = self.frame_buffers.get(session_id) { + buf.push(frame); + } + } + + /// Return counts for health-check. + pub fn stats(&self) -> (usize, usize) { + let active: usize = self + .sessions + .iter() + .filter(|r| r.status == SessionStatus::Active) + .count(); + (self.sessions.len(), self.agents.len()) + } +}