agent: main.rs — subcommand CLI dispatch, shutdown signal (Ctrl+C + Windows SCM), graceful WebSocket close

This commit is contained in:
Butterfly Dev 2026-04-07 05:36:17 +00:00
parent d0e8bf5569
commit 84f559c1a7

View File

@ -4,19 +4,37 @@
//! it to a Butterfly server via WebSocket binary frames. Simultaneously receives
//! HUD commands (mouse/keyboard) from remote viewers and executes them locally.
//!
//! Wire protocol:
//! - Binary WebSocket frames = raw video (H.264 NALs or JPEG) with 13-byte header
//! - Text WebSocket frames = JSON control messages (HUD, heartbeat, etc.)
//! ## Wire Protocol
//!
//! Usage:
//! butterfly-agent --server ws://192.168.1.100:8080 --encoder h264
//! butterfly-agent --server ws://192.168.1.100:8080 --encoder jpeg --fps 30
//! - **Binary WebSocket frames** — Raw video (H.264 NALs or JPEG) with 13-byte header.
//! This is the low-latency path — no JSON, no base64 encoding overhead.
//! - **Text WebSocket frames** — JSON control messages (HUD commands, heartbeat, etc.).
//!
//! ## Usage
//!
//! ```text
//! butterfly-agent run --server ws://192.168.1.100:8080 --encoder h264
//! butterfly-agent run --server ws://192.168.1.100:8080 --encoder jpeg --fps 30
//! butterfly-agent service install --server ws://192.168.1.100:8080 --encoder h264
//! butterfly-agent service status
//! ```
//!
//! ## Service Mode
//!
//! When installed as a system service (via `service install`), the binary detects
//! service mode and connects to the platform's service manager:
//! - **Linux (systemd)**: Runs as a systemd unit. Stdout/stderr go to journald.
//! - **Windows (SCM)**: Connects to the Windows Service Control Manager via
//! `service_dispatcher`. Handles STOP/SHUTDOWN control events for graceful
//! termination.
mod capture;
mod cli;
mod config;
mod encoder;
mod input;
mod protocol;
mod service;
use anyhow::{Context, Result};
use encoder::{EncodedFrame, EncoderType};
@ -27,49 +45,116 @@ use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message;
use capture::ScreenCapture;
use config::AgentConfig;
use config::RunOptions;
use input::InputHandler;
/// Event sent from the capture thread to the main loop.
enum CaptureEvent {
/// A binary video frame ready to send (complete with header).
BinaryFrame(Vec<u8>),
/// The capture thread hit a fatal error.
Error(String),
}
// ── Windows Service Support ────────────────────────────────────────────────────
#[tokio::main]
async fn main() -> Result<()> {
/// Global storage for the run options when the Windows SCM launches the agent.
/// The SCM calls our service main function, which needs access to the parsed CLI
/// options. Since `service_dispatcher::start()` blocks and doesn't pass custom
/// data to the callback, we use a global OnceLock set before entering the dispatcher.
#[cfg(windows)]
use std::sync::OnceLock;
#[cfg(windows)]
static SERVICE_OPTS: OnceLock<RunOptions> = OnceLock::new();
/// Generate the FFI-compatible Windows service entry point.
/// This macro creates an `extern "system"` function that the SCM calls.
#[cfg(windows)]
windows_service::define_windows_service!(ffi_service_main, windows_service_main);
// ── Entry Point ────────────────────────────────────────────────────────────────
fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("info")).init();
let config = AgentConfig::parse_args();
let cli = cli::Cli::parse();
match cli.command {
cli::Command::Run(opts) => {
// On Windows, check if we should enter service mode.
#[cfg(windows)]
if opts.windows_service {
SERVICE_OPTS.set(opts).expect("SERVICE_OPTS already set (should only be set once)");
return start_windows_service();
}
// Normal foreground mode.
run_foreground(opts)
}
cli::Command::Service(action) => {
service::handle_service_action(action)
}
}
}
/// Run the agent in foreground mode with a tokio runtime and Ctrl+C handler.
fn run_foreground(opts: RunOptions) -> Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("butterfly-agent")
.build()
.context("failed to create tokio runtime")?;
rt.block_on(async {
// Create a shutdown channel. The sender is moved into the Ctrl+C handler.
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
// Spawn a task that listens for Ctrl+C and signals shutdown.
tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => {
info!("Ctrl+C received, shutting down...");
let _ = shutdown_tx.send(true);
}
Err(e) => {
error!("Ctrl+C handler failed: {}", e);
}
}
});
run_agent(opts, shutdown_rx).await
})
}
/// Run the agent's main loop with auto-reconnect and shutdown signal support.
///
/// This function initializes the capture, encoder, and input handler, then enters
/// a reconnect loop. Each iteration connects to the server and streams video until
/// the connection drops or a shutdown signal is received.
async fn run_agent(
opts: RunOptions,
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
) -> Result<()> {
let agent_id = uuid::Uuid::new_v4().to_string();
info!("🦋 Butterfly Agent v{}", env!("CARGO_PKG_VERSION"));
info!(" agent id: {}", agent_id);
info!(" server: {}", config.server);
info!(" encoder: {}", config.encoder);
info!(" fps: {}", config.fps);
info!(" quality: {}", config.quality);
info!(" server: {}", opts.server);
info!(" encoder: {}", opts.encoder);
info!(" fps: {}", opts.fps);
info!(" quality: {}", opts.quality);
// Parse encoder type.
let encoder_type: EncoderType = config.encoder.parse()
let encoder_type: EncoderType = opts.encoder.parse()
.map_err(|e| anyhow::anyhow!("{}", e))?;
// Determine session ID.
let session_id = match &config.session_id {
let session_id = match &opts.session_id {
Some(id) => {
info!(" session: {} (provided)", id);
id.clone()
}
None => {
info!(" session: creating new session via REST...");
create_session_via_rest(&config).await?
create_session_via_rest(&opts).await?
}
};
// Initialize screen capture (raw BGRA output).
let screen_capture = ScreenCapture::new(config.display)
let screen_capture = ScreenCapture::new(opts.display)
.context("failed to initialize screen capture")?;
let resolution = screen_capture.resolution();
info!(" display: {}x{}", screen_capture.width(), screen_capture.height());
@ -79,7 +164,7 @@ async fn main() -> Result<()> {
encoder_type,
screen_capture.width(),
screen_capture.height(),
config.quality,
opts.quality,
).context("failed to create encoder")?;
info!(" encoder: {:?} ready", encoder_type);
@ -92,8 +177,14 @@ async fn main() -> Result<()> {
// Run the main connection loop with auto-reconnect.
let mut reconnect_count = 0u32;
loop {
// Check for shutdown before attempting a new connection.
if *shutdown_rx.borrow() {
info!("shutdown requested — not reconnecting");
break;
}
match run_session(
&config,
&opts,
&agent_id,
&session_id,
&resolution,
@ -102,6 +193,7 @@ async fn main() -> Result<()> {
&encoder_type,
&mut video_encoder,
&mut input_handler,
&mut shutdown_rx,
).await {
Ok(()) => {
info!("session ended cleanly");
@ -109,14 +201,30 @@ async fn main() -> Result<()> {
}
Err(e) => {
error!("session error: {}", e);
if config.reconnect_delay_secs == 0 { break; }
if config.max_reconnect > 0 && reconnect_count >= config.max_reconnect {
error!("max reconnect attempts ({}) reached", config.max_reconnect);
if *shutdown_rx.borrow() {
break;
}
if opts.reconnect_delay_secs == 0 {
break;
}
if opts.max_reconnect > 0 && reconnect_count >= opts.max_reconnect {
error!("max reconnect attempts ({}) reached", opts.max_reconnect);
break;
}
reconnect_count += 1;
info!("reconnecting in {}s (attempt {})...", config.reconnect_delay_secs, reconnect_count);
tokio::time::sleep(config.reconnect_delay()).await;
info!(
"reconnecting in {}s (attempt {})...",
opts.reconnect_delay_secs, reconnect_count
);
// Wait for reconnect delay, but also respond to shutdown.
tokio::select! {
_ = tokio::time::sleep(opts.reconnect_delay()) => {}
_ = shutdown_rx.changed() => {
info!("shutdown requested during reconnect wait");
break;
}
}
}
}
}
@ -128,7 +236,7 @@ async fn main() -> Result<()> {
/// Run a single session: connect, stream video, handle input.
#[allow(clippy::too_many_arguments)]
async fn run_session(
config: &AgentConfig,
opts: &RunOptions,
agent_id: &str,
session_id: &str,
resolution: &str,
@ -137,8 +245,9 @@ async fn run_session(
encoder_type: &EncoderType,
video_encoder: &mut Box<dyn encoder::VideoEncoder>,
input_handler: &mut InputHandler,
shutdown_rx: &mut tokio::sync::watch::Receiver<bool>,
) -> Result<()> {
let ws_url = config.ws_url(session_id);
let ws_url = opts.ws_url(session_id);
info!("connecting to {}", ws_url);
let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url)
@ -172,8 +281,8 @@ async fn run_session(
// Spawn capture + encode loop (blocking thread).
let cap_session_id = session_id.to_string();
let cap_encoder_type = *encoder_type;
let cap_quality = config.quality;
let cap_frame_interval = config.frame_interval();
let cap_quality = opts.quality;
let cap_frame_interval = opts.frame_interval();
let cap_width = capture_width;
let cap_height = capture_height;
let capture_handle = tokio::task::spawn_blocking(move || {
@ -190,7 +299,7 @@ async fn run_session(
// Spawn heartbeat.
let hb_session_id = session_id.to_string();
let hb_interval = config.heartbeat_interval();
let hb_interval = opts.heartbeat_interval();
let (hb_tx, mut hb_rx) = mpsc::channel::<()>(1);
let heartbeat_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(hb_interval);
@ -200,8 +309,7 @@ async fn run_session(
}
});
// Main select loop.
let mut start_time = std::time::Instant::now();
// Main select loop — multiplexes capture, WebSocket, heartbeat, and shutdown.
loop {
tokio::select! {
// ── Encoded frame from capture thread ──────────────────────
@ -227,7 +335,6 @@ async fn run_session(
ws_msg = ws_read.next() => {
match ws_msg {
Some(Ok(Message::Text(text))) => {
// JSON control message (HUD command, heartbeat ack, etc.).
handle_server_text(&text, input_handler)?;
}
Some(Ok(Message::Ping(data))) => {
@ -257,10 +364,19 @@ async fn run_session(
break;
}
}
// ── Shutdown signal (from Ctrl+C or Windows service stop) ─
_ = shutdown_rx.changed() => {
info!("shutdown signal received — closing WebSocket");
// Send a close frame before exiting.
let _ = ws_write.send(Message::Close(None)).await;
break;
}
}
}
// Cleanup.
// Cleanup: drop the capture sender to signal the capture thread to stop,
// then wait for both spawned tasks to finish.
let _ = hb_tx.send(()).await;
drop(capture_tx);
let _ = capture_handle.await;
@ -317,7 +433,10 @@ fn capture_encode_loop(
width: usize,
height: usize,
) {
info!("capture+encode loop started for session {} (encoder: {:?})", session_id, encoder_type);
info!(
"capture+encode loop started for session {} (encoder: {:?})",
session_id, encoder_type
);
// Create a local capturer and encoder for this thread.
let mut capturer = match ScreenCapture::new(0) {
@ -419,8 +538,8 @@ fn get_hostname() -> String {
}
/// Create a new session via REST API.
async fn create_session_via_rest(config: &AgentConfig) -> Result<String> {
let url = format!("{}/sessions", config.api_base());
async fn create_session_via_rest(opts: &RunOptions) -> Result<String> {
let url = format!("{}/sessions", opts.api_base());
let client = reqwest::Client::new();
let response = client.post(&url).json(&serde_json::json!({})).send().await
@ -443,3 +562,150 @@ async fn create_session_via_rest(config: &AgentConfig) -> Result<String> {
_ => anyhow::bail!("unexpected response"),
}
}
// ── Windows Service Runtime ────────────────────────────────────────────────────
/// Start the Windows service dispatcher.
///
/// This function blocks and enters the Windows Service Control Manager event loop.
/// The SCM will call `ffi_service_main` (generated by `define_windows_service!`)
/// which delegates to `windows_service_main`.
#[cfg(windows)]
fn start_windows_service() -> Result<()> {
use windows_service::service_dispatcher;
info!("entering Windows service mode — connecting to SCM...");
service_dispatcher::start("butterfly-agent", ffi_service_main)
.context("failed to connect to Windows Service Control Manager")?;
// service_dispatcher::start() blocks until the service is stopped.
// It should never return Ok — it returns Err if the SCM connection fails.
Ok(())
}
/// Windows service main function — called by the SCM via the dispatcher.
///
/// This function:
/// 1. Registers a service control handler (handles STOP/SHUTDOWN)
/// 2. Reports SERVICE_RUNNING state
/// 3. Creates a tokio runtime and runs the agent
/// 4. On shutdown signal, reports SERVICE_STOPPED
#[cfg(windows)]
fn windows_service_main(_args: Vec<std::ffi::OsString>) {
use windows_service::service::ServiceControl;
use windows_service::service::ServiceControlAccept;
use windows_service::service::ServiceState;
use windows_service::service::ServiceStatus;
use windows_service::service::ServiceType;
use windows_service::service_control_handler::ServiceControlHandlerResult;
use windows_service::service_control_handler::register;
info!("Windows service main started");
// Retrieve the run options stored by main() before the dispatcher was called.
let opts = match SERVICE_OPTS.get() {
Some(o) => o.clone(),
None => {
error!("SERVICE_OPTS not set — cannot start service");
return;
}
};
// Create a shutdown channel for the service control handler.
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
// Define the service control handler callback.
// This is called by the SCM when it sends a control event (STOP, SHUTDOWN, etc.).
let event_handler = move |control_event| -> ServiceControlHandlerResult {
match control_event {
ServiceControl::Stop | ServiceControl::Shutdown => {
info!("Windows SCM sent {:?} — shutting down service", control_event);
let _ = shutdown_tx.send(true);
ServiceControlHandlerResult::NoError
}
ServiceControl::Interrogate => {
ServiceControlHandlerResult::NoError
}
ServiceControl::Pause => {
ServiceControlHandlerResult::NotImplemented
}
ServiceControl::Continue => {
ServiceControlHandlerResult::NotImplemented
}
ServiceControl::ParamChange => {
ServiceControlHandlerResult::NoError
}
_ => {
ServiceControlHandlerResult::NotImplemented
}
}
};
// Register the service control handler with the SCM.
let status_handle = match register("butterfly-agent", event_handler) {
Ok(h) => h,
Err(e) => {
error!("failed to register service control handler: {}", e);
return;
}
};
// Report SERVICE_RUNNING to the SCM.
if let Err(e) = status_handle.set_service_status(ServiceStatus {
service_type: ServiceType::OWN_PROCESS,
current_state: ServiceState::Running,
controls_accepted: ServiceControlAccept::STOP | ServiceControlAccept::SHUTDOWN,
..Default::default()
}) {
error!("failed to set service status to RUNNING: {}", e);
return;
}
info!("service reported as RUNNING — starting agent");
// Create a tokio runtime and run the agent.
// We use a new runtime here because we're outside of any existing tokio context
// (the SCM calls this function from its own thread).
let rt = match tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("butterfly-agent-svc")
.build()
{
Ok(r) => r,
Err(e) => {
error!("failed to create tokio runtime: {}", e);
let _ = status_handle.set_service_status(ServiceStatus {
service_type: ServiceType::OWN_PROCESS,
current_state: ServiceState::Stopped,
..Default::default()
});
return;
}
};
let result = rt.block_on(run_agent(opts, shutdown_rx));
if let Err(e) = result {
error!("agent exited with error: {}", e);
}
// Report SERVICE_STOPPED to the SCM.
info!("reporting SERVICE_STOPPED to SCM");
if let Err(e) = status_handle.set_service_status(ServiceStatus {
service_type: ServiceType::OWN_PROCESS,
current_state: ServiceState::Stopped,
..Default::default()
}) {
error!("failed to set service status to STOPPED: {}", e);
}
}
// ── Internal Types ─────────────────────────────────────────────────────────────
/// Event sent from the capture thread to the main loop.
enum CaptureEvent {
/// A binary video frame ready to send (complete with header).
BinaryFrame(Vec<u8>),
/// The capture thread hit a fatal error.
Error(String),
}