diff --git a/agent/src/protocol.rs b/agent/src/protocol.rs index 0164440..ab20399 100644 --- a/agent/src/protocol.rs +++ b/agent/src/protocol.rs @@ -1,96 +1,88 @@ //! Message types for communication between the agent and the Butterfly server. //! -//! These mirror the server's `WsMessage` enum (serde-tagged with `msg_type`). -//! The agent sends **Agent → Server** messages and receives **Server → Agent** messages. +//! ## Wire Protocol +//! +//! Two WebSocket message types are used: +//! +//! - **Binary frames** (`Message::Binary`) — Raw video/audio data with a minimal header. +//! No JSON, no base64. This is the low-latency path. +//! - **Text frames** (`Message::Text`) — JSON-encoded control messages (HUD commands, +//! heartbeat, session info, etc.). +//! +//! ## Binary Frame Format +//! +//! ```text +//! Offset Size Field +//! 0 1 Frame type (see FrameType constants) +//! 1 4 Timestamp in ms since agent start (little-endian u32) +//! 5 4 Display width in pixels (little-endian u32) +//! 9 4 Display height in pixels (little-endian u32) +//! 13 ... Payload (raw encoded data — H.264 NALs or JPEG bytes) +//! ``` +//! +//! Total header overhead: 13 bytes (negligible vs. typical 10-50KB payload). use serde::{Deserialize, Serialize}; -// ── Agent → Server ──────────────────────────────────────────────────────────── +// ── Binary frame types ───────────────────────────────────────────────────────── -/// A single display frame, base64-encoded JPEG. -#[derive(Debug, Clone, Serialize)] -pub struct DisplayFrame { - pub session_id: String, - pub data: String, - pub timestamp: Option, +/// Frame type byte values for the binary protocol. +pub mod frame_type { + /// H.264 keyframe (contains SPS + PPS + IDR slice). + pub const H264_KEY: u8 = 0x01; + /// H.264 delta/P-frame (predicted frame). + pub const H264_DELTA: u8 = 0x02; + /// JPEG image (fallback encoder). + pub const JPEG: u8 = 0x03; + /// Audio chunk (future). + pub const AUDIO: u8 = 0x04; } -/// An audio chunk (future: PCM/Opus encoded). -#[derive(Debug, Clone, Serialize)] -pub struct AudioFrame { - pub session_id: String, - pub data: String, - pub timestamp: Option, +/// Binary video frame header size in bytes. +pub const FRAME_HEADER_SIZE: usize = 13; + +/// Build a binary video frame from codec type, timestamp, dimensions, and payload. +/// +/// Returns a complete `Vec` ready to send as a WebSocket binary frame. +pub fn build_binary_frame( + frame_type: u8, + timestamp_ms: u32, + width: u32, + height: u32, + payload: &[u8], +) -> Vec { + let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len()); + buf.push(frame_type); + buf.extend_from_slice(×tamp_ms.to_le_bytes()); + buf.extend_from_slice(&width.to_le_bytes()); + buf.extend_from_slice(&height.to_le_bytes()); + buf.extend_from_slice(payload); + buf } -/// Agent announces its capabilities on connect. -#[derive(Debug, Clone, Serialize)] -pub struct AgentInfo { - pub session_id: String, - pub agent_id: String, - pub resolution: Option, - pub hostname: String, +/// Parse the header of a binary frame. +/// +/// Returns `(frame_type, timestamp_ms, width, height)`. +/// The caller can then read `data[FRAME_HEADER_SIZE..]` as the payload. +pub fn parse_frame_header(data: &[u8]) -> Option<(u8, u32, u32, u32)> { + if data.len() < FRAME_HEADER_SIZE { + return None; + } + Some(( + data[0], + u32::from_le_bytes(data[1..5].try_into().ok()?), + u32::from_le_bytes(data[5..9].try_into().ok()?), + u32::from_le_bytes(data[9..13].try_into().ok()?), + )) } -/// Keep-alive ping. -#[derive(Debug, Clone, Serialize)] -pub struct Heartbeat; +// ── JSON control messages (text frames) ──────────────────────────────────────── -// ── Server → Agent ──────────────────────────────────────────────────────────── - -/// A HUD command forwarded from a viewer (mouse, keyboard, etc.). -#[derive(Debug, Clone, Deserialize)] -pub struct ForwardHudCommand { - pub command: String, - pub params: serde_json::Value, -} - -/// A resize request forwarded from a viewer. -#[derive(Debug, Clone, Deserialize)] -pub struct ForwardResize { - pub width: u32, - pub height: u32, -} - -/// Server tells the agent to start/stop streaming. -#[derive(Debug, Clone, Deserialize)] -pub struct StreamControl { - pub action: String, -} - -/// Generic acknowledgment from server. -#[derive(Debug, Clone, Deserialize)] -pub struct Ack { - pub message: String, -} - -/// Error from server. -#[derive(Debug, Clone, Deserialize)] -pub struct ErrorMsg { - pub message: String, -} - -// ── Unified message envelope ────────────────────────────────────────────────── - -/// All messages use a `msg_type` discriminator for routing. +/// All JSON messages use a `msg_type` discriminator for routing. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "msg_type", rename_all = "snake_case")] -pub enum AgentWsMessage { +pub enum ControlMessage { // Outgoing (Agent → Server) - #[serde(rename = "display_frame")] - DisplayFrame { - session_id: String, - data: String, - #[serde(skip_serializing_if = "Option::is_none")] - timestamp: Option, - }, - #[serde(rename = "audio_frame")] - AudioFrame { - session_id: String, - data: String, - #[serde(skip_serializing_if = "Option::is_none")] - timestamp: Option, - }, #[serde(rename = "agent_info")] AgentInfo { session_id: String, @@ -99,11 +91,13 @@ pub enum AgentWsMessage { resolution: Option, #[serde(skip_serializing_if = "Option::is_none")] hostname: Option, + #[serde(skip_serializing_if = "Option::is_none")] + encoder: Option, }, #[serde(rename = "heartbeat")] Heartbeat, - // Incoming (Server → Agent) — included for deserialization + // Incoming (Server → Agent) #[serde(rename = "forward_hud_command")] ForwardHudCommand { command: String, @@ -122,41 +116,57 @@ pub enum AgentWsMessage { Error { message: String }, } -// ── Helpers ─────────────────────────────────────────────────────────────────── +// ── JSON helpers ─────────────────────────────────────────────────────────────── -/// Build a display frame message. -pub fn display_frame_msg(session_id: &str, data: String) -> String { - let msg = AgentWsMessage::DisplayFrame { - session_id: session_id.to_string(), - data, - timestamp: None, - }; - serde_json::to_string(&msg).unwrap_or_default() -} - -/// Build an audio frame message. -pub fn audio_frame_msg(session_id: &str, data: String) -> String { - let msg = AgentWsMessage::AudioFrame { - session_id: session_id.to_string(), - data, - timestamp: None, - }; - serde_json::to_string(&msg).unwrap_or_default() -} - -/// Build an agent info message. -pub fn agent_info_msg(session_id: &str, agent_id: &str, resolution: Option<&str>, hostname: Option<&str>) -> String { - let msg = AgentWsMessage::AgentInfo { +/// Build an agent info JSON message. +pub fn agent_info_msg( + session_id: &str, + agent_id: &str, + resolution: Option<&str>, + hostname: Option<&str>, + encoder: Option<&str>, +) -> String { + let msg = ControlMessage::AgentInfo { session_id: session_id.to_string(), agent_id: agent_id.to_string(), resolution: resolution.map(String::from), hostname: hostname.map(String::from), + encoder: encoder.map(String::from), }; serde_json::to_string(&msg).unwrap_or_default() } -/// Build a heartbeat message. +/// Build a heartbeat JSON message. pub fn heartbeat_msg() -> String { - let msg = AgentWsMessage::Heartbeat; - serde_json::to_string(&msg).unwrap_or_default() + serde_json::to_string(&ControlMessage::Heartbeat).unwrap_or_default() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_binary_frame_roundtrip() { + let frame = build_binary_frame( + frame_type::H264_KEY, + 12345, + 1920, + 1080, + &[0xDE, 0xAD, 0xBE, 0xEF], + ); + assert_eq!(frame.len(), FRAME_HEADER_SIZE + 4); + assert_eq!(frame[0], frame_type::H264_KEY); + + let (ft, ts, w, h) = parse_frame_header(&frame).unwrap(); + assert_eq!(ft, frame_type::H264_KEY); + assert_eq!(ts, 12345); + assert_eq!(w, 1920); + assert_eq!(h, 1080); + assert_eq!(&frame[FRAME_HEADER_SIZE..], &[0xDE, 0xAD, 0xBE, 0xEF]); + } + + #[test] + fn test_parse_too_short() { + assert!(parse_frame_header(&[0x01, 0x00]).is_none()); + } }