From 84f559c1a70d2b13a7ea4d3da3a12578b83c6502 Mon Sep 17 00:00:00 2001 From: Butterfly Dev Date: Tue, 7 Apr 2026 05:36:17 +0000 Subject: [PATCH] =?UTF-8?q?agent:=20main.rs=20=E2=80=94=20subcommand=20CLI?= =?UTF-8?q?=20dispatch,=20shutdown=20signal=20(Ctrl+C=20+=20Windows=20SCM)?= =?UTF-8?q?,=20graceful=20WebSocket=20close?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/src/main.rs | 354 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 310 insertions(+), 44 deletions(-) diff --git a/agent/src/main.rs b/agent/src/main.rs index ac9429c..8f9f102 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -4,19 +4,37 @@ //! it to a Butterfly server via WebSocket binary frames. Simultaneously receives //! HUD commands (mouse/keyboard) from remote viewers and executes them locally. //! -//! Wire protocol: -//! - Binary WebSocket frames = raw video (H.264 NALs or JPEG) with 13-byte header -//! - Text WebSocket frames = JSON control messages (HUD, heartbeat, etc.) +//! ## Wire Protocol //! -//! Usage: -//! butterfly-agent --server ws://192.168.1.100:8080 --encoder h264 -//! butterfly-agent --server ws://192.168.1.100:8080 --encoder jpeg --fps 30 +//! - **Binary WebSocket frames** — Raw video (H.264 NALs or JPEG) with 13-byte header. +//! This is the low-latency path — no JSON, no base64 encoding overhead. +//! - **Text WebSocket frames** — JSON control messages (HUD commands, heartbeat, etc.). +//! +//! ## Usage +//! +//! ```text +//! butterfly-agent run --server ws://192.168.1.100:8080 --encoder h264 +//! butterfly-agent run --server ws://192.168.1.100:8080 --encoder jpeg --fps 30 +//! butterfly-agent service install --server ws://192.168.1.100:8080 --encoder h264 +//! butterfly-agent service status +//! ``` +//! +//! ## Service Mode +//! +//! When installed as a system service (via `service install`), the binary detects +//! service mode and connects to the platform's service manager: +//! - **Linux (systemd)**: Runs as a systemd unit. Stdout/stderr go to journald. +//! - **Windows (SCM)**: Connects to the Windows Service Control Manager via +//! `service_dispatcher`. Handles STOP/SHUTDOWN control events for graceful +//! termination. mod capture; +mod cli; mod config; mod encoder; mod input; mod protocol; +mod service; use anyhow::{Context, Result}; use encoder::{EncodedFrame, EncoderType}; @@ -27,49 +45,116 @@ use tokio::sync::mpsc; use tokio_tungstenite::tungstenite::Message; use capture::ScreenCapture; -use config::AgentConfig; +use config::RunOptions; use input::InputHandler; -/// Event sent from the capture thread to the main loop. -enum CaptureEvent { - /// A binary video frame ready to send (complete with header). - BinaryFrame(Vec), - /// The capture thread hit a fatal error. - Error(String), -} +// ── Windows Service Support ──────────────────────────────────────────────────── -#[tokio::main] -async fn main() -> Result<()> { +/// Global storage for the run options when the Windows SCM launches the agent. +/// The SCM calls our service main function, which needs access to the parsed CLI +/// options. Since `service_dispatcher::start()` blocks and doesn't pass custom +/// data to the callback, we use a global OnceLock set before entering the dispatcher. +#[cfg(windows)] +use std::sync::OnceLock; + +#[cfg(windows)] +static SERVICE_OPTS: OnceLock = OnceLock::new(); + +/// Generate the FFI-compatible Windows service entry point. +/// This macro creates an `extern "system"` function that the SCM calls. +#[cfg(windows)] +windows_service::define_windows_service!(ffi_service_main, windows_service_main); + +// ── Entry Point ──────────────────────────────────────────────────────────────── + +fn main() -> Result<()> { env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("info")).init(); - let config = AgentConfig::parse_args(); + let cli = cli::Cli::parse(); + + match cli.command { + cli::Command::Run(opts) => { + // On Windows, check if we should enter service mode. + #[cfg(windows)] + if opts.windows_service { + SERVICE_OPTS.set(opts).expect("SERVICE_OPTS already set (should only be set once)"); + return start_windows_service(); + } + + // Normal foreground mode. + run_foreground(opts) + } + cli::Command::Service(action) => { + service::handle_service_action(action) + } + } +} + +/// Run the agent in foreground mode with a tokio runtime and Ctrl+C handler. +fn run_foreground(opts: RunOptions) -> Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("butterfly-agent") + .build() + .context("failed to create tokio runtime")?; + + rt.block_on(async { + // Create a shutdown channel. The sender is moved into the Ctrl+C handler. + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + + // Spawn a task that listens for Ctrl+C and signals shutdown. + tokio::spawn(async move { + match tokio::signal::ctrl_c().await { + Ok(()) => { + info!("Ctrl+C received, shutting down..."); + let _ = shutdown_tx.send(true); + } + Err(e) => { + error!("Ctrl+C handler failed: {}", e); + } + } + }); + + run_agent(opts, shutdown_rx).await + }) +} + +/// Run the agent's main loop with auto-reconnect and shutdown signal support. +/// +/// This function initializes the capture, encoder, and input handler, then enters +/// a reconnect loop. Each iteration connects to the server and streams video until +/// the connection drops or a shutdown signal is received. +async fn run_agent( + opts: RunOptions, + mut shutdown_rx: tokio::sync::watch::Receiver, +) -> Result<()> { let agent_id = uuid::Uuid::new_v4().to_string(); info!("🦋 Butterfly Agent v{}", env!("CARGO_PKG_VERSION")); info!(" agent id: {}", agent_id); - info!(" server: {}", config.server); - info!(" encoder: {}", config.encoder); - info!(" fps: {}", config.fps); - info!(" quality: {}", config.quality); + info!(" server: {}", opts.server); + info!(" encoder: {}", opts.encoder); + info!(" fps: {}", opts.fps); + info!(" quality: {}", opts.quality); // Parse encoder type. - let encoder_type: EncoderType = config.encoder.parse() + let encoder_type: EncoderType = opts.encoder.parse() .map_err(|e| anyhow::anyhow!("{}", e))?; // Determine session ID. - let session_id = match &config.session_id { + let session_id = match &opts.session_id { Some(id) => { info!(" session: {} (provided)", id); id.clone() } None => { info!(" session: creating new session via REST..."); - create_session_via_rest(&config).await? + create_session_via_rest(&opts).await? } }; // Initialize screen capture (raw BGRA output). - let screen_capture = ScreenCapture::new(config.display) + let screen_capture = ScreenCapture::new(opts.display) .context("failed to initialize screen capture")?; let resolution = screen_capture.resolution(); info!(" display: {}x{}", screen_capture.width(), screen_capture.height()); @@ -79,7 +164,7 @@ async fn main() -> Result<()> { encoder_type, screen_capture.width(), screen_capture.height(), - config.quality, + opts.quality, ).context("failed to create encoder")?; info!(" encoder: {:?} ready", encoder_type); @@ -92,8 +177,14 @@ async fn main() -> Result<()> { // Run the main connection loop with auto-reconnect. let mut reconnect_count = 0u32; loop { + // Check for shutdown before attempting a new connection. + if *shutdown_rx.borrow() { + info!("shutdown requested — not reconnecting"); + break; + } + match run_session( - &config, + &opts, &agent_id, &session_id, &resolution, @@ -102,6 +193,7 @@ async fn main() -> Result<()> { &encoder_type, &mut video_encoder, &mut input_handler, + &mut shutdown_rx, ).await { Ok(()) => { info!("session ended cleanly"); @@ -109,14 +201,30 @@ async fn main() -> Result<()> { } Err(e) => { error!("session error: {}", e); - if config.reconnect_delay_secs == 0 { break; } - if config.max_reconnect > 0 && reconnect_count >= config.max_reconnect { - error!("max reconnect attempts ({}) reached", config.max_reconnect); + if *shutdown_rx.borrow() { + break; + } + if opts.reconnect_delay_secs == 0 { + break; + } + if opts.max_reconnect > 0 && reconnect_count >= opts.max_reconnect { + error!("max reconnect attempts ({}) reached", opts.max_reconnect); break; } reconnect_count += 1; - info!("reconnecting in {}s (attempt {})...", config.reconnect_delay_secs, reconnect_count); - tokio::time::sleep(config.reconnect_delay()).await; + info!( + "reconnecting in {}s (attempt {})...", + opts.reconnect_delay_secs, reconnect_count + ); + + // Wait for reconnect delay, but also respond to shutdown. + tokio::select! { + _ = tokio::time::sleep(opts.reconnect_delay()) => {} + _ = shutdown_rx.changed() => { + info!("shutdown requested during reconnect wait"); + break; + } + } } } } @@ -128,7 +236,7 @@ async fn main() -> Result<()> { /// Run a single session: connect, stream video, handle input. #[allow(clippy::too_many_arguments)] async fn run_session( - config: &AgentConfig, + opts: &RunOptions, agent_id: &str, session_id: &str, resolution: &str, @@ -137,8 +245,9 @@ async fn run_session( encoder_type: &EncoderType, video_encoder: &mut Box, input_handler: &mut InputHandler, + shutdown_rx: &mut tokio::sync::watch::Receiver, ) -> Result<()> { - let ws_url = config.ws_url(session_id); + let ws_url = opts.ws_url(session_id); info!("connecting to {}", ws_url); let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url) @@ -172,8 +281,8 @@ async fn run_session( // Spawn capture + encode loop (blocking thread). let cap_session_id = session_id.to_string(); let cap_encoder_type = *encoder_type; - let cap_quality = config.quality; - let cap_frame_interval = config.frame_interval(); + let cap_quality = opts.quality; + let cap_frame_interval = opts.frame_interval(); let cap_width = capture_width; let cap_height = capture_height; let capture_handle = tokio::task::spawn_blocking(move || { @@ -190,7 +299,7 @@ async fn run_session( // Spawn heartbeat. let hb_session_id = session_id.to_string(); - let hb_interval = config.heartbeat_interval(); + let hb_interval = opts.heartbeat_interval(); let (hb_tx, mut hb_rx) = mpsc::channel::<()>(1); let heartbeat_handle = tokio::spawn(async move { let mut interval = tokio::time::interval(hb_interval); @@ -200,8 +309,7 @@ async fn run_session( } }); - // Main select loop. - let mut start_time = std::time::Instant::now(); + // Main select loop — multiplexes capture, WebSocket, heartbeat, and shutdown. loop { tokio::select! { // ── Encoded frame from capture thread ────────────────────── @@ -227,7 +335,6 @@ async fn run_session( ws_msg = ws_read.next() => { match ws_msg { Some(Ok(Message::Text(text))) => { - // JSON control message (HUD command, heartbeat ack, etc.). handle_server_text(&text, input_handler)?; } Some(Ok(Message::Ping(data))) => { @@ -257,10 +364,19 @@ async fn run_session( break; } } + + // ── Shutdown signal (from Ctrl+C or Windows service stop) ─ + _ = shutdown_rx.changed() => { + info!("shutdown signal received — closing WebSocket"); + // Send a close frame before exiting. + let _ = ws_write.send(Message::Close(None)).await; + break; + } } } - // Cleanup. + // Cleanup: drop the capture sender to signal the capture thread to stop, + // then wait for both spawned tasks to finish. let _ = hb_tx.send(()).await; drop(capture_tx); let _ = capture_handle.await; @@ -317,7 +433,10 @@ fn capture_encode_loop( width: usize, height: usize, ) { - info!("capture+encode loop started for session {} (encoder: {:?})", session_id, encoder_type); + info!( + "capture+encode loop started for session {} (encoder: {:?})", + session_id, encoder_type + ); // Create a local capturer and encoder for this thread. let mut capturer = match ScreenCapture::new(0) { @@ -419,8 +538,8 @@ fn get_hostname() -> String { } /// Create a new session via REST API. -async fn create_session_via_rest(config: &AgentConfig) -> Result { - let url = format!("{}/sessions", config.api_base()); +async fn create_session_via_rest(opts: &RunOptions) -> Result { + let url = format!("{}/sessions", opts.api_base()); let client = reqwest::Client::new(); let response = client.post(&url).json(&serde_json::json!({})).send().await @@ -443,3 +562,150 @@ async fn create_session_via_rest(config: &AgentConfig) -> Result { _ => anyhow::bail!("unexpected response"), } } + +// ── Windows Service Runtime ──────────────────────────────────────────────────── + +/// Start the Windows service dispatcher. +/// +/// This function blocks and enters the Windows Service Control Manager event loop. +/// The SCM will call `ffi_service_main` (generated by `define_windows_service!`) +/// which delegates to `windows_service_main`. +#[cfg(windows)] +fn start_windows_service() -> Result<()> { + use windows_service::service_dispatcher; + + info!("entering Windows service mode — connecting to SCM..."); + service_dispatcher::start("butterfly-agent", ffi_service_main) + .context("failed to connect to Windows Service Control Manager")?; + + // service_dispatcher::start() blocks until the service is stopped. + // It should never return Ok — it returns Err if the SCM connection fails. + Ok(()) +} + +/// Windows service main function — called by the SCM via the dispatcher. +/// +/// This function: +/// 1. Registers a service control handler (handles STOP/SHUTDOWN) +/// 2. Reports SERVICE_RUNNING state +/// 3. Creates a tokio runtime and runs the agent +/// 4. On shutdown signal, reports SERVICE_STOPPED +#[cfg(windows)] +fn windows_service_main(_args: Vec) { + use windows_service::service::ServiceControl; + use windows_service::service::ServiceControlAccept; + use windows_service::service::ServiceState; + use windows_service::service::ServiceStatus; + use windows_service::service::ServiceType; + use windows_service::service_control_handler::ServiceControlHandlerResult; + use windows_service::service_control_handler::register; + + info!("Windows service main started"); + + // Retrieve the run options stored by main() before the dispatcher was called. + let opts = match SERVICE_OPTS.get() { + Some(o) => o.clone(), + None => { + error!("SERVICE_OPTS not set — cannot start service"); + return; + } + }; + + // Create a shutdown channel for the service control handler. + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + + // Define the service control handler callback. + // This is called by the SCM when it sends a control event (STOP, SHUTDOWN, etc.). + let event_handler = move |control_event| -> ServiceControlHandlerResult { + match control_event { + ServiceControl::Stop | ServiceControl::Shutdown => { + info!("Windows SCM sent {:?} — shutting down service", control_event); + let _ = shutdown_tx.send(true); + ServiceControlHandlerResult::NoError + } + ServiceControl::Interrogate => { + ServiceControlHandlerResult::NoError + } + ServiceControl::Pause => { + ServiceControlHandlerResult::NotImplemented + } + ServiceControl::Continue => { + ServiceControlHandlerResult::NotImplemented + } + ServiceControl::ParamChange => { + ServiceControlHandlerResult::NoError + } + _ => { + ServiceControlHandlerResult::NotImplemented + } + } + }; + + // Register the service control handler with the SCM. + let status_handle = match register("butterfly-agent", event_handler) { + Ok(h) => h, + Err(e) => { + error!("failed to register service control handler: {}", e); + return; + } + }; + + // Report SERVICE_RUNNING to the SCM. + if let Err(e) = status_handle.set_service_status(ServiceStatus { + service_type: ServiceType::OWN_PROCESS, + current_state: ServiceState::Running, + controls_accepted: ServiceControlAccept::STOP | ServiceControlAccept::SHUTDOWN, + ..Default::default() + }) { + error!("failed to set service status to RUNNING: {}", e); + return; + } + + info!("service reported as RUNNING — starting agent"); + + // Create a tokio runtime and run the agent. + // We use a new runtime here because we're outside of any existing tokio context + // (the SCM calls this function from its own thread). + let rt = match tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("butterfly-agent-svc") + .build() + { + Ok(r) => r, + Err(e) => { + error!("failed to create tokio runtime: {}", e); + let _ = status_handle.set_service_status(ServiceStatus { + service_type: ServiceType::OWN_PROCESS, + current_state: ServiceState::Stopped, + ..Default::default() + }); + return; + } + }; + + let result = rt.block_on(run_agent(opts, shutdown_rx)); + + if let Err(e) = result { + error!("agent exited with error: {}", e); + } + + // Report SERVICE_STOPPED to the SCM. + info!("reporting SERVICE_STOPPED to SCM"); + if let Err(e) = status_handle.set_service_status(ServiceStatus { + service_type: ServiceType::OWN_PROCESS, + current_state: ServiceState::Stopped, + ..Default::default() + }) { + error!("failed to set service status to STOPPED: {}", e); + } +} + +// ── Internal Types ───────────────────────────────────────────────────────────── + +/// Event sent from the capture thread to the main loop. +enum CaptureEvent { + /// A binary video frame ready to send (complete with header). + BinaryFrame(Vec), + /// The capture thread hit a fatal error. + Error(String), +}