server: state.rs — AppState with DashMap sessions/agents, FrameBuffer ring for late-joiners
This commit is contained in:
parent
fdde57c7e2
commit
ff476f12dc
153
server/src/state.rs
Normal file
153
server/src/state.rs
Normal file
@ -0,0 +1,153 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use dashmap::DashMap;
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
use crate::models::{AgentConnection, Session, SessionStatus};
|
||||||
|
|
||||||
|
/// Global shared state accessible from every request / WS handler via `web::Data`.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct AppState {
|
||||||
|
/// All sessions keyed by session id.
|
||||||
|
pub sessions: DashMap<String, Session>,
|
||||||
|
/// Connected agents keyed by agent id.
|
||||||
|
pub agents: DashMap<String, AgentConnection>,
|
||||||
|
/// Server start time for uptime calculations.
|
||||||
|
pub started_at: Instant,
|
||||||
|
/// Configured max sessions.
|
||||||
|
pub max_sessions: usize,
|
||||||
|
/// Configured idle timeout in seconds.
|
||||||
|
pub idle_timeout_secs: u64,
|
||||||
|
/// Configured frame buffer size.
|
||||||
|
pub frame_buffer_size: usize,
|
||||||
|
/// Per-session frame buffers (session_id → ring of latest base64 frames).
|
||||||
|
pub frame_buffers: DashMap<String, Arc<FrameBuffer>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Simple circular buffer that keeps the *N* most recent display frames.
|
||||||
|
/// Viewers that connect mid-stream can pull the latest frame immediately
|
||||||
|
/// instead of waiting for the next one from the agent.
|
||||||
|
pub struct FrameBuffer {
|
||||||
|
frames: parking_lot::Mutex<Vec<String>>,
|
||||||
|
capacity: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FrameBuffer {
|
||||||
|
pub fn new(capacity: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
frames: parking_lot::Mutex::new(Vec::with_capacity(capacity)),
|
||||||
|
capacity,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Push a base64-encoded frame, evicting the oldest when full.
|
||||||
|
pub fn push(&self, frame: String) {
|
||||||
|
let mut buf = self.frames.lock();
|
||||||
|
if buf.len() >= self.capacity {
|
||||||
|
buf.remove(0);
|
||||||
|
}
|
||||||
|
buf.push(frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the most recent frame, if any.
|
||||||
|
pub fn latest(&self) -> Option<String> {
|
||||||
|
let buf = self.frames.lock();
|
||||||
|
buf.last().cloned()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AppState {
|
||||||
|
pub fn new(max_sessions: usize, idle_timeout_secs: u64, frame_buffer_size: usize) -> Arc<Self> {
|
||||||
|
Arc::new(Self {
|
||||||
|
sessions: DashMap::new(),
|
||||||
|
agents: DashMap::new(),
|
||||||
|
started_at: Instant::now(),
|
||||||
|
max_sessions,
|
||||||
|
idle_timeout_secs,
|
||||||
|
frame_buffer_size,
|
||||||
|
frame_buffers: DashMap::new(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a brand-new session and return its id.
|
||||||
|
pub fn create_session(self: &Arc<Self>) -> Result<Session, &'static str> {
|
||||||
|
if self.sessions.len() >= self.max_sessions {
|
||||||
|
return Err("max sessions reached");
|
||||||
|
}
|
||||||
|
let id = uuid::Uuid::new_v4().to_string();
|
||||||
|
let session = Session::new(id.clone());
|
||||||
|
self.frame_buffers.insert(id.clone(), Arc::new(FrameBuffer::new(self.frame_buffer_size)));
|
||||||
|
self.sessions.insert(id.clone(), session.clone());
|
||||||
|
info!("session created: {}", id);
|
||||||
|
Ok(session)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Look up a session by id.
|
||||||
|
pub fn get_session(&self, id: &str) -> Option<Session> {
|
||||||
|
self.sessions.get(id).map(|r| r.value().clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a session (and its frame buffer).
|
||||||
|
pub fn remove_session(&self, id: &str) -> bool {
|
||||||
|
let removed = self.sessions.remove(id).is_some();
|
||||||
|
self.frame_buffers.remove(id);
|
||||||
|
if removed {
|
||||||
|
info!("session removed: {}", id);
|
||||||
|
}
|
||||||
|
removed
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark a session as active once the agent starts streaming.
|
||||||
|
pub fn activate_session(&self, session_id: &str, resolution: Option<&str>) {
|
||||||
|
if let Some(mut s) = self.sessions.get_mut(session_id) {
|
||||||
|
s.status = SessionStatus::Active;
|
||||||
|
s.display_active = true;
|
||||||
|
s.resolution = resolution.map(String::from);
|
||||||
|
info!("session activated: {}", session_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register a newly connected agent.
|
||||||
|
pub fn register_agent(&self, agent: AgentConnection) {
|
||||||
|
info!("agent registered: {} for session {}", agent.agent_id, agent.session_id);
|
||||||
|
self.agents.insert(agent.agent_id.clone(), agent);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unregister an agent and mark its session as disconnected.
|
||||||
|
pub fn unregister_agent(&self, agent_id: &str) {
|
||||||
|
if let Some((_, agent)) = self.agents.remove(agent_id) {
|
||||||
|
// Mark the session as disconnected if no other agents are connected for it.
|
||||||
|
let session_id = agent.session_id.clone();
|
||||||
|
let still_has_agent = self
|
||||||
|
.agents
|
||||||
|
.iter()
|
||||||
|
.any(|r| r.session_id == session_id && r.agent_id != agent_id);
|
||||||
|
if !still_has_agent {
|
||||||
|
if let Some(mut s) = self.sessions.get_mut(&session_id) {
|
||||||
|
s.status = SessionStatus::Disconnected;
|
||||||
|
s.display_active = false;
|
||||||
|
s.audio_active = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("agent unregistered: {} (session {})", agent_id, session_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Push a display frame into the session's circular buffer.
|
||||||
|
pub fn push_frame(&self, session_id: &str, frame: String) {
|
||||||
|
if let Some(buf) = self.frame_buffers.get(session_id) {
|
||||||
|
buf.push(frame);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return counts for health-check.
|
||||||
|
pub fn stats(&self) -> (usize, usize) {
|
||||||
|
let active: usize = self
|
||||||
|
.sessions
|
||||||
|
.iter()
|
||||||
|
.filter(|r| r.status == SessionStatus::Active)
|
||||||
|
.count();
|
||||||
|
(self.sessions.len(), self.agents.len())
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user