diff --git a/desktop/src/app/services/websocket.service.ts b/desktop/src/app/services/websocket.service.ts new file mode 100644 index 0000000..92288d8 --- /dev/null +++ b/desktop/src/app/services/websocket.service.ts @@ -0,0 +1,175 @@ +import { Injectable, OnDestroy } from '@angular/core'; +import { BehaviorSubject, Observable, Subject, timer } from 'rxjs'; +import { filter, takeUntil, tap } from 'rxjs/operators'; + +// ── Types matching the Rust WsMessage enum ───────────────────────────────── + +export interface FrameBroadcast { + msg_type: 'frame_broadcast'; + data: string; + content_type: string; +} + +export interface AudioBroadcast { + msg_type: 'audio_broadcast'; + data: string; + content_type: string; +} + +export interface SessionUpdate { + msg_type: 'session_update'; + session_id: string; + status: 'waiting' | 'active' | 'disconnected'; + resolution: string | null; +} + +export interface WsError { + msg_type: 'error'; + message: string; +} + +export interface WsAck { + msg_type: 'ack'; + message: string; +} + +export type ServerMessage = FrameBroadcast | AudioBroadcast | SessionUpdate | WsError | WsAck; + +// Messages the viewer sends to the server. + +export interface HudCommandMsg { + msg_type: 'hud_command'; + session_id: string; + command: string; + params: Record; +} + +export interface ResizeMsg { + msg_type: 'resize'; + session_id: string; + width: number; + height: number; +} + +export type ClientMessage = HudCommandMsg | ResizeMsg; + +// ── Service ───────────────────────────────────────────────────────────────── + +@Injectable({ providedIn: 'root' }) +export class WebSocketService implements OnDestroy { + private socket: WebSocket | null = null; + private destroy$ = new Subject(); + + /** All incoming server messages. */ + private messages$ = new Subject(); + /** Connection state. */ + private connected$ = new BehaviorSubject(false); + /** Current session id we're subscribed to. */ + private sessionId: string | null = null; + /** Heartbeat interval. */ + private heartbeatTimer: ReturnType | null = null; + + /** Observable of all server messages, typed. */ + readonly messages: Observable = this.messages$.asObservable(); + + /** Whether we are currently connected to the server. */ + readonly connected: Observable = this.connected$.asObservable(); + + /** Stream of display frames only. */ + readonly displayFrames: Observable = this.messages$.pipe( + filter((m): m is FrameBroadcast => m.msg_type === 'frame_broadcast'), + ); + + /** Stream of audio chunks only. */ + readonly audioChunks: Observable = this.messages$.pipe( + filter((m): m is AudioBroadcast => m.msg_type === 'audio_broadcast'), + ); + + /** Stream of session state updates. */ + readonly sessionUpdates: Observable = this.messages$.pipe( + filter((m): m is SessionUpdate => m.msg_type === 'session_update'), + ); + + ngOnDestroy(): void { + this.disconnect(); + this.destroy$.next(); + this.destroy$.complete(); + } + + /** + * Connect to a session as a viewer. + * @param sessionId The session to watch. + * @param serverUrl Base server URL (default: current origin). + */ + connect(sessionId: string, serverUrl: string = window.location.origin): void { + this.disconnect(); + this.sessionId = sessionId; + + const wsUrl = serverUrl.replace(/^http/, 'ws') + `/ws/${sessionId}?client_type=viewer`; + this.socket = new WebSocket(wsUrl); + + this.socket.onopen = () => { + console.log('[WS] connected to session', sessionId); + this.connected$.next(true); + // Start heartbeat every 15 seconds. + this.heartbeatTimer = setInterval(() => this.sendRaw({ msg_type: 'heartbeat' }), 15000); + }; + + this.socket.onmessage = (event) => { + try { + const msg: ServerMessage = JSON.parse(event.data); + this.messages$.next(msg); + } catch (e) { + console.warn('[WS] failed to parse message', event.data, e); + } + }; + + this.socket.onclose = (event) => { + console.log('[WS] disconnected', event.code, event.reason); + this.connected$.next(false); + this.stopHeartbeat(); + }; + + this.socket.onerror = (event) => { + console.error('[WS] error', event); + this.connected$.next(false); + }; + } + + /** Disconnect from the current session. */ + disconnect(): void { + this.stopHeartbeat(); + if (this.socket) { + this.socket.close(); + this.socket = null; + } + this.connected$.next(false); + this.sessionId = null; + } + + /** Send a HUD command to the agent (mouse, keyboard, etc.). */ + sendHudCommand(command: string, params: Record = {}): void { + if (!this.sessionId) return; + this.sendRaw({ msg_type: 'hud_command', session_id: this.sessionId, command, params }); + } + + /** Send a resize request to the agent. */ + sendResize(width: number, height: number): void { + if (!this.sessionId) return; + this.sendRaw({ msg_type: 'resize', session_id: this.sessionId, width, height }); + } + + /** Raw JSON send. */ + private sendRaw(msg: Record): void { + if (this.socket?.readyState === WebSocket.OPEN) { + this.socket.send(JSON.stringify(msg)); + } + } + + private stopHeartbeat(): void { + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + } +}