agent: protocol.rs — binary video frame format (13-byte header), JSON control messages separate

This commit is contained in:
Butterfly Dev 2026-04-07 04:55:16 +00:00
parent 1468097c1b
commit a97ebed88b

View File

@ -1,96 +1,88 @@
//! Message types for communication between the agent and the Butterfly server. //! Message types for communication between the agent and the Butterfly server.
//! //!
//! These mirror the server's `WsMessage` enum (serde-tagged with `msg_type`). //! ## Wire Protocol
//! The agent sends **Agent → Server** messages and receives **Server → Agent** messages. //!
//! Two WebSocket message types are used:
//!
//! - **Binary frames** (`Message::Binary`) — Raw video/audio data with a minimal header.
//! No JSON, no base64. This is the low-latency path.
//! - **Text frames** (`Message::Text`) — JSON-encoded control messages (HUD commands,
//! heartbeat, session info, etc.).
//!
//! ## Binary Frame Format
//!
//! ```text
//! Offset Size Field
//! 0 1 Frame type (see FrameType constants)
//! 1 4 Timestamp in ms since agent start (little-endian u32)
//! 5 4 Display width in pixels (little-endian u32)
//! 9 4 Display height in pixels (little-endian u32)
//! 13 ... Payload (raw encoded data — H.264 NALs or JPEG bytes)
//! ```
//!
//! Total header overhead: 13 bytes (negligible vs. typical 10-50KB payload).
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
// ── Agent → Server ──────────────────────────────────────────────────────────── // ── Binary frame types ─────────────────────────────────────────────────────────
/// A single display frame, base64-encoded JPEG. /// Frame type byte values for the binary protocol.
#[derive(Debug, Clone, Serialize)] pub mod frame_type {
pub struct DisplayFrame { /// H.264 keyframe (contains SPS + PPS + IDR slice).
pub session_id: String, pub const H264_KEY: u8 = 0x01;
pub data: String, /// H.264 delta/P-frame (predicted frame).
pub timestamp: Option<f64>, pub const H264_DELTA: u8 = 0x02;
/// JPEG image (fallback encoder).
pub const JPEG: u8 = 0x03;
/// Audio chunk (future).
pub const AUDIO: u8 = 0x04;
} }
/// An audio chunk (future: PCM/Opus encoded). /// Binary video frame header size in bytes.
#[derive(Debug, Clone, Serialize)] pub const FRAME_HEADER_SIZE: usize = 13;
pub struct AudioFrame {
pub session_id: String, /// Build a binary video frame from codec type, timestamp, dimensions, and payload.
pub data: String, ///
pub timestamp: Option<f64>, /// Returns a complete `Vec<u8>` ready to send as a WebSocket binary frame.
pub fn build_binary_frame(
frame_type: u8,
timestamp_ms: u32,
width: u32,
height: u32,
payload: &[u8],
) -> Vec<u8> {
let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
buf.push(frame_type);
buf.extend_from_slice(&timestamp_ms.to_le_bytes());
buf.extend_from_slice(&width.to_le_bytes());
buf.extend_from_slice(&height.to_le_bytes());
buf.extend_from_slice(payload);
buf
} }
/// Agent announces its capabilities on connect. /// Parse the header of a binary frame.
#[derive(Debug, Clone, Serialize)] ///
pub struct AgentInfo { /// Returns `(frame_type, timestamp_ms, width, height)`.
pub session_id: String, /// The caller can then read `data[FRAME_HEADER_SIZE..]` as the payload.
pub agent_id: String, pub fn parse_frame_header(data: &[u8]) -> Option<(u8, u32, u32, u32)> {
pub resolution: Option<String>, if data.len() < FRAME_HEADER_SIZE {
pub hostname: String, return None;
}
Some((
data[0],
u32::from_le_bytes(data[1..5].try_into().ok()?),
u32::from_le_bytes(data[5..9].try_into().ok()?),
u32::from_le_bytes(data[9..13].try_into().ok()?),
))
} }
/// Keep-alive ping. // ── JSON control messages (text frames) ────────────────────────────────────────
#[derive(Debug, Clone, Serialize)]
pub struct Heartbeat;
// ── Server → Agent ──────────────────────────────────────────────────────────── /// All JSON messages use a `msg_type` discriminator for routing.
/// A HUD command forwarded from a viewer (mouse, keyboard, etc.).
#[derive(Debug, Clone, Deserialize)]
pub struct ForwardHudCommand {
pub command: String,
pub params: serde_json::Value,
}
/// A resize request forwarded from a viewer.
#[derive(Debug, Clone, Deserialize)]
pub struct ForwardResize {
pub width: u32,
pub height: u32,
}
/// Server tells the agent to start/stop streaming.
#[derive(Debug, Clone, Deserialize)]
pub struct StreamControl {
pub action: String,
}
/// Generic acknowledgment from server.
#[derive(Debug, Clone, Deserialize)]
pub struct Ack {
pub message: String,
}
/// Error from server.
#[derive(Debug, Clone, Deserialize)]
pub struct ErrorMsg {
pub message: String,
}
// ── Unified message envelope ──────────────────────────────────────────────────
/// All messages use a `msg_type` discriminator for routing.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "msg_type", rename_all = "snake_case")] #[serde(tag = "msg_type", rename_all = "snake_case")]
pub enum AgentWsMessage { pub enum ControlMessage {
// Outgoing (Agent → Server) // Outgoing (Agent → Server)
#[serde(rename = "display_frame")]
DisplayFrame {
session_id: String,
data: String,
#[serde(skip_serializing_if = "Option::is_none")]
timestamp: Option<f64>,
},
#[serde(rename = "audio_frame")]
AudioFrame {
session_id: String,
data: String,
#[serde(skip_serializing_if = "Option::is_none")]
timestamp: Option<f64>,
},
#[serde(rename = "agent_info")] #[serde(rename = "agent_info")]
AgentInfo { AgentInfo {
session_id: String, session_id: String,
@ -99,11 +91,13 @@ pub enum AgentWsMessage {
resolution: Option<String>, resolution: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
hostname: Option<String>, hostname: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
encoder: Option<String>,
}, },
#[serde(rename = "heartbeat")] #[serde(rename = "heartbeat")]
Heartbeat, Heartbeat,
// Incoming (Server → Agent) — included for deserialization // Incoming (Server → Agent)
#[serde(rename = "forward_hud_command")] #[serde(rename = "forward_hud_command")]
ForwardHudCommand { ForwardHudCommand {
command: String, command: String,
@ -122,41 +116,57 @@ pub enum AgentWsMessage {
Error { message: String }, Error { message: String },
} }
// ── Helpers ─────────────────────────────────────────────────────────────────── // ── JSON helpers ───────────────────────────────────────────────────────────────
/// Build a display frame message. /// Build an agent info JSON message.
pub fn display_frame_msg(session_id: &str, data: String) -> String { pub fn agent_info_msg(
let msg = AgentWsMessage::DisplayFrame { session_id: &str,
session_id: session_id.to_string(), agent_id: &str,
data, resolution: Option<&str>,
timestamp: None, hostname: Option<&str>,
}; encoder: Option<&str>,
serde_json::to_string(&msg).unwrap_or_default() ) -> String {
} let msg = ControlMessage::AgentInfo {
/// Build an audio frame message.
pub fn audio_frame_msg(session_id: &str, data: String) -> String {
let msg = AgentWsMessage::AudioFrame {
session_id: session_id.to_string(),
data,
timestamp: None,
};
serde_json::to_string(&msg).unwrap_or_default()
}
/// Build an agent info message.
pub fn agent_info_msg(session_id: &str, agent_id: &str, resolution: Option<&str>, hostname: Option<&str>) -> String {
let msg = AgentWsMessage::AgentInfo {
session_id: session_id.to_string(), session_id: session_id.to_string(),
agent_id: agent_id.to_string(), agent_id: agent_id.to_string(),
resolution: resolution.map(String::from), resolution: resolution.map(String::from),
hostname: hostname.map(String::from), hostname: hostname.map(String::from),
encoder: encoder.map(String::from),
}; };
serde_json::to_string(&msg).unwrap_or_default() serde_json::to_string(&msg).unwrap_or_default()
} }
/// Build a heartbeat message. /// Build a heartbeat JSON message.
pub fn heartbeat_msg() -> String { pub fn heartbeat_msg() -> String {
let msg = AgentWsMessage::Heartbeat; serde_json::to_string(&ControlMessage::Heartbeat).unwrap_or_default()
serde_json::to_string(&msg).unwrap_or_default() }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_binary_frame_roundtrip() {
let frame = build_binary_frame(
frame_type::H264_KEY,
12345,
1920,
1080,
&[0xDE, 0xAD, 0xBE, 0xEF],
);
assert_eq!(frame.len(), FRAME_HEADER_SIZE + 4);
assert_eq!(frame[0], frame_type::H264_KEY);
let (ft, ts, w, h) = parse_frame_header(&frame).unwrap();
assert_eq!(ft, frame_type::H264_KEY);
assert_eq!(ts, 12345);
assert_eq!(w, 1920);
assert_eq!(h, 1080);
assert_eq!(&frame[FRAME_HEADER_SIZE..], &[0xDE, 0xAD, 0xBE, 0xEF]);
}
#[test]
fn test_parse_too_short() {
assert!(parse_frame_header(&[0x01, 0x00]).is_none());
}
} }