desktop: services/websocket.service.ts — WebSocket client matching Rust WsMessage protocol

This commit is contained in:
Butterfly Dev 2026-04-07 03:26:04 +00:00
parent 2b0537395d
commit 710560d300

View File

@ -0,0 +1,175 @@
import { Injectable, OnDestroy } from '@angular/core';
import { BehaviorSubject, Observable, Subject, timer } from 'rxjs';
import { filter, takeUntil, tap } from 'rxjs/operators';
// ── Types matching the Rust WsMessage enum ─────────────────────────────────
export interface FrameBroadcast {
msg_type: 'frame_broadcast';
data: string;
content_type: string;
}
export interface AudioBroadcast {
msg_type: 'audio_broadcast';
data: string;
content_type: string;
}
export interface SessionUpdate {
msg_type: 'session_update';
session_id: string;
status: 'waiting' | 'active' | 'disconnected';
resolution: string | null;
}
export interface WsError {
msg_type: 'error';
message: string;
}
export interface WsAck {
msg_type: 'ack';
message: string;
}
export type ServerMessage = FrameBroadcast | AudioBroadcast | SessionUpdate | WsError | WsAck;
// Messages the viewer sends to the server.
export interface HudCommandMsg {
msg_type: 'hud_command';
session_id: string;
command: string;
params: Record<string, unknown>;
}
export interface ResizeMsg {
msg_type: 'resize';
session_id: string;
width: number;
height: number;
}
export type ClientMessage = HudCommandMsg | ResizeMsg;
// ── Service ─────────────────────────────────────────────────────────────────
@Injectable({ providedIn: 'root' })
export class WebSocketService implements OnDestroy {
private socket: WebSocket | null = null;
private destroy$ = new Subject<void>();
/** All incoming server messages. */
private messages$ = new Subject<ServerMessage>();
/** Connection state. */
private connected$ = new BehaviorSubject<boolean>(false);
/** Current session id we're subscribed to. */
private sessionId: string | null = null;
/** Heartbeat interval. */
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
/** Observable of all server messages, typed. */
readonly messages: Observable<ServerMessage> = this.messages$.asObservable();
/** Whether we are currently connected to the server. */
readonly connected: Observable<boolean> = this.connected$.asObservable();
/** Stream of display frames only. */
readonly displayFrames: Observable<FrameBroadcast> = this.messages$.pipe(
filter((m): m is FrameBroadcast => m.msg_type === 'frame_broadcast'),
);
/** Stream of audio chunks only. */
readonly audioChunks: Observable<AudioBroadcast> = this.messages$.pipe(
filter((m): m is AudioBroadcast => m.msg_type === 'audio_broadcast'),
);
/** Stream of session state updates. */
readonly sessionUpdates: Observable<SessionUpdate> = this.messages$.pipe(
filter((m): m is SessionUpdate => m.msg_type === 'session_update'),
);
ngOnDestroy(): void {
this.disconnect();
this.destroy$.next();
this.destroy$.complete();
}
/**
* Connect to a session as a viewer.
* @param sessionId The session to watch.
* @param serverUrl Base server URL (default: current origin).
*/
connect(sessionId: string, serverUrl: string = window.location.origin): void {
this.disconnect();
this.sessionId = sessionId;
const wsUrl = serverUrl.replace(/^http/, 'ws') + `/ws/${sessionId}?client_type=viewer`;
this.socket = new WebSocket(wsUrl);
this.socket.onopen = () => {
console.log('[WS] connected to session', sessionId);
this.connected$.next(true);
// Start heartbeat every 15 seconds.
this.heartbeatTimer = setInterval(() => this.sendRaw({ msg_type: 'heartbeat' }), 15000);
};
this.socket.onmessage = (event) => {
try {
const msg: ServerMessage = JSON.parse(event.data);
this.messages$.next(msg);
} catch (e) {
console.warn('[WS] failed to parse message', event.data, e);
}
};
this.socket.onclose = (event) => {
console.log('[WS] disconnected', event.code, event.reason);
this.connected$.next(false);
this.stopHeartbeat();
};
this.socket.onerror = (event) => {
console.error('[WS] error', event);
this.connected$.next(false);
};
}
/** Disconnect from the current session. */
disconnect(): void {
this.stopHeartbeat();
if (this.socket) {
this.socket.close();
this.socket = null;
}
this.connected$.next(false);
this.sessionId = null;
}
/** Send a HUD command to the agent (mouse, keyboard, etc.). */
sendHudCommand(command: string, params: Record<string, unknown> = {}): void {
if (!this.sessionId) return;
this.sendRaw({ msg_type: 'hud_command', session_id: this.sessionId, command, params });
}
/** Send a resize request to the agent. */
sendResize(width: number, height: number): void {
if (!this.sessionId) return;
this.sendRaw({ msg_type: 'resize', session_id: this.sessionId, width, height });
}
/** Raw JSON send. */
private sendRaw(msg: Record<string, unknown>): void {
if (this.socket?.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(msg));
}
}
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
}