// src/lib/drone-session.ts // Copyright (C) 2026 Robert Colbert // All Rights Reserved import { GadgetComponent, GadgetLogLevel, IUser, IDroneRegistration, ChatTurnStatus, GadgetId, WorkspaceMode, IChatToolCall, IChatTurnBlock, } from "@gadget/api"; import { GadgetSocket, SocketSession, SocketSessionType, } from "./socket-session.js"; import { SocketService } from "../services/index.js"; import { ChatTurn } from "../models/chat-turn.js"; interface IStreamingBuffer { currentMode: 'thinking' | 'responding' | null; thinkingContent: string; respondingContent: string; lastBlockCreatedAt?: Date; } export class DroneSession extends SocketSession { protected type: SocketSessionType = SocketSessionType.Drone; registration: IDroneRegistration; chatSessionId: GadgetId | undefined; currentTurnId: GadgetId | undefined; workspaceMode: WorkspaceMode = WorkspaceMode.Idle; private streamingBuffers: Map = new Map(); constructor(socket: GadgetSocket, registration: IDroneRegistration) { super(socket, registration.user as IUser); this.registration = registration; } register() { super.register(); this.socket.on("status", this.onStatus.bind(this)); this.socket.on( "workspaceModeChanged", this.onWorkspaceModeChanged.bind(this), ); this.socket.on("thinking", this.onThinking.bind(this)); this.socket.on("response", this.onResponse.bind(this)); this.socket.on("toolCall", this.onToolCall.bind(this)); this.socket.on("workOrderComplete", this.onWorkOrderComplete.bind(this)); this.socket.on( "requestCrashRecovery", this.onRequestCrashRecovery.bind(this), ); this.socket.on("requestTermination", this.onRequestTermination.bind(this)); this.socket.on("log", this.onLog.bind(this)); this.socket.on("agent:thinking", this.onAgentThinking.bind(this)); this.socket.on("agent:response", this.onAgentResponse.bind(this)); this.socket.on("agent:tool-call", this.onAgentToolCall.bind(this)); this.socket.on("agent:tool-result", this.onAgentToolResult.bind(this)); this.socket.on("agent:complete", this.onAgentComplete.bind(this)); } async onLog( timestamp: Date, component: GadgetComponent, level: GadgetLogLevel, message: string, metadata?: unknown, ): Promise { if (!this.chatSessionId) { return; } try { const codeSession = SocketService.getCodeSessionByChatSessionId( this.chatSessionId, ); codeSession.onLog(timestamp, component, level, message, metadata); } catch (error) { this.log.error("failed to route log message", { error }); } } async onStatus(message: string): Promise { if (!this.chatSessionId) { this.log.warn( "drone status event received but no chat session is active", ); return; } try { const codeSession = SocketService.getCodeSessionByChatSessionId( this.chatSessionId, ); codeSession.socket.emit("status", message); } catch (error) { this.log.error("failed to route status message", { error }); } } /** * Called when the drone emits thinking content from the agent. * Aggregates thinking tokens in memory and persists at mode changes. */ async onThinking(content: string): Promise { if (!this.chatSessionId) { this.log.warn("thinking event received but no chat session is active"); return; } try { const codeSession = SocketService.getCodeSessionByChatSessionId( this.chatSessionId, ); codeSession.onThinking(content); if (this.currentTurnId) { const buffer = this.getOrCreateBuffer(this.currentTurnId); // Check for mode transition if (buffer.currentMode !== 'thinking') { // Flush previous mode if exists await this.flushBuffer(this.currentTurnId); buffer.currentMode = 'thinking'; buffer.thinkingContent = ''; buffer.lastBlockCreatedAt = new Date(); } // Aggregate content buffer.thinkingContent += content; } } catch (error) { this.log.error("failed to route thinking event", { error }); } } /** * Called when the drone emits response content from the agent. * Aggregates response tokens in memory and persists at mode changes. */ async onResponse(content: string): Promise { if (!this.chatSessionId) { this.log.warn("response event received but no chat session is active"); return; } try { const codeSession = SocketService.getCodeSessionByChatSessionId( this.chatSessionId, ); codeSession.onResponse(content); if (this.currentTurnId) { const buffer = this.getOrCreateBuffer(this.currentTurnId); // Check for mode transition if (buffer.currentMode !== 'responding') { // Flush previous mode if exists await this.flushBuffer(this.currentTurnId); buffer.currentMode = 'responding'; buffer.respondingContent = ''; buffer.lastBlockCreatedAt = new Date(); } // Aggregate content buffer.respondingContent += content; } } catch (error) { this.log.error("failed to route response event", { error }); } } /** * Called when the drone emits a tool call event from the agent. * Flushes current buffer and adds tool block immediately. */ async onToolCall( callId: string, name: string, params: string, response: string, ): Promise { if (!this.chatSessionId) { this.log.warn("toolCall event received but no chat session is active"); return; } try { const codeSession = SocketService.getCodeSessionByChatSessionId( this.chatSessionId, ); codeSession.onToolCall(callId, name, params, response); if (this.currentTurnId) { // Flush current buffer before adding tool block await this.flushBuffer(this.currentTurnId); // Add tool block immediately const turn = await ChatTurn.findById(this.currentTurnId); if (turn) { turn.blocks.push({ mode: 'tool', createdAt: new Date(), content: { callId, name, parameters: params, response, }, }); turn.toolCalls.push({ callId, name, parameters: params, response, }); turn.stats.toolCallCount = turn.toolCalls.length; await turn.save(); } } } catch (error) { this.log.error("failed to route toolCall event", { error }); } } /** * Called when the drone completes a work order. * Flushes any remaining buffered content before finalizing. */ async onWorkOrderComplete( turnId: string, success: boolean, message?: string, ): Promise { if (!this.chatSessionId) { this.log.warn( "workOrderComplete event received but no chat session is active", ); return; } try { // Flush any remaining buffered content await this.flushBuffer(turnId); this.streamingBuffers.delete(turnId); const turn = await ChatTurn.findById(turnId); if (turn) { turn.status = success ? ChatTurnStatus.Finished : ChatTurnStatus.Error; if (!success && message) { turn.errorMessage = message; } await turn.save(); } const codeSession = SocketService.getCodeSessionByChatSessionId( this.chatSessionId, ); codeSession.onWorkOrderComplete(turnId, success, message); this.currentTurnId = undefined; } catch (error) { this.log.error("failed to process workOrderComplete event", { error }); } } /** * Sets the active chat session ID for this drone session. */ setChatSessionId(chatSessionId: GadgetId): void { this.chatSessionId = chatSessionId; // Clear buffer for this turn if exists this.streamingBuffers.clear(); } /** * Gets or creates a streaming buffer for a turn. */ private getOrCreateBuffer(turnId: string): IStreamingBuffer { if (!this.streamingBuffers.has(turnId)) { this.streamingBuffers.set(turnId, { currentMode: null, thinkingContent: '', respondingContent: '', }); } return this.streamingBuffers.get(turnId)!; } /** * Flushes the current buffer to the database. */ private async flushBuffer(turnId: string): Promise { const buffer = this.streamingBuffers.get(turnId); if (!buffer) return; const turn = await ChatTurn.findById(turnId); if (!turn) return; // Flush thinking content if (buffer.currentMode === 'thinking' && buffer.thinkingContent) { turn.blocks.push({ mode: 'thinking', createdAt: buffer.lastBlockCreatedAt || new Date(), content: buffer.thinkingContent, }); buffer.thinkingContent = ''; } // Flush responding content if (buffer.currentMode === 'responding' && buffer.respondingContent) { turn.blocks.push({ mode: 'responding', createdAt: buffer.lastBlockCreatedAt || new Date(), content: buffer.respondingContent, }); buffer.respondingContent = ''; } if (turn.blocks.length > 0) { await turn.save(); } } /** * Sets the current turn ID being processed by this drone. */ setCurrentTurnId(turnId: GadgetId): void { this.currentTurnId = turnId; } /** * Called when the drone emits a workspace mode change. */ async onWorkspaceModeChanged(mode: WorkspaceMode): Promise { if (!this.chatSessionId) { return; } this.workspaceMode = mode; this.log.info("workspace mode changed", { mode }); const codeSession = SocketService.getCodeSessionByChatSessionId( this.chatSessionId, ); codeSession.onWorkspaceModeChanged(mode); } async onAgentThinking(data: { agentId: string; thinking: string }): Promise { if (!this.chatSessionId) return; try { const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId); codeSession.socket.emit("agent:thinking", data); } catch (error) { this.log.error("failed to route agent:thinking", { error }); } } async onAgentResponse(data: { agentId: string; chunk: string }): Promise { if (!this.chatSessionId) return; try { const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId); codeSession.socket.emit("agent:response", data); } catch (error) { this.log.error("failed to route agent:response", { error }); } } async onAgentToolCall(data: { agentId: string; tool: string; args: unknown }): Promise { if (!this.chatSessionId) return; try { const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId); codeSession.socket.emit("agent:tool-call", data); } catch (error) { this.log.error("failed to route agent:tool-call", { error }); } } async onAgentToolResult(data: { agentId: string; tool: string; result: unknown }): Promise { if (!this.chatSessionId) return; try { const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId); codeSession.socket.emit("agent:tool-result", data); } catch (error) { this.log.error("failed to route agent:tool-result", { error }); } } async onAgentComplete(data: { agentId: string; response?: string; subagent?: Record; stats?: Record }): Promise { if (!this.chatSessionId) return; try { const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId); codeSession.socket.emit("agent:complete", data); } catch (error) { this.log.error("failed to route agent:complete to frontend", { error }); } // Update the persisted tool call with the final response and subagent data if (this.currentTurnId && data.agentId) { try { const turn = await ChatTurn.findById(this.currentTurnId); if (turn) { const toolCall = turn.toolCalls.find((tc: IChatToolCall) => tc.callId === data.agentId); if (toolCall) { if (data.response) toolCall.response = data.response; if (data.subagent) (toolCall as any).subagent = data.subagent; } const block = turn.blocks.find((b: IChatTurnBlock) => b.mode === 'tool' && (b.content as IChatToolCall).callId === data.agentId); if (block && block.mode === 'tool') { if (data.response) (block.content as IChatToolCall).response = data.response; if (data.subagent) (block.content as any).subagent = data.subagent; } await turn.save(); } } catch (error) { this.log.error("failed to update subagent tool call in DB", { error }); } } } /** * Called when the drone requests crash recovery for an incomplete work order. */ async onRequestCrashRecovery(data: { workspaceId: string; turnId: string; chatSessionId: string; }): Promise { this.log.info("crash recovery request received", { workspaceId: data.workspaceId, turnId: data.turnId, }); try { const turn = await ChatTurn.findById(data.turnId); if (!turn) { this.log.warn("crash recovery: turn not found", { turnId: data.turnId, }); this.socket.emit("crashRecoveryResponse", { turnId: data.turnId, action: "discard", }); return; } if (turn.status === ChatTurnStatus.Finished) { this.log.info("crash recovery: turn already finished", { turnId: data.turnId, }); this.socket.emit("crashRecoveryResponse", { turnId: data.turnId, action: "discard", }); return; } // Turn is still processing - mark for retry turn.status = ChatTurnStatus.Error; turn.errorMessage = "Drone crashed during processing - retrying"; await turn.save(); this.socket.emit("crashRecoveryResponse", { turnId: data.turnId, action: "retry", retryDelay: 5000, }); this.log.info("crash recovery: scheduled retry", { turnId: data.turnId, }); // Schedule retry (will route to same workspaceId) setTimeout(() => { this.retryWorkOrder(turn); }, 5000); } catch (error) { const err = error as Error; this.log.error("crash recovery failed", { error: err.message, }); this.socket.emit("crashRecoveryResponse", { turnId: data.turnId, action: "discard", }); } } /** * Retries a work order after crash recovery. */ private async retryWorkOrder(turn: any): Promise { // TODO: Re-emit processWorkOrder to this drone this.log.info("work order retry not yet implemented", { turnId: turn._id, }); } /** * Called when the platform requests termination of this drone. * Forwards the termination request to the drone socket with logging. * @param cb Callback to invoke with termination result */ async onRequestTermination(cb: (success: boolean) => void): Promise { this.log.info("requestTermination received, forwarding to drone", { registrationId: this.registration._id, }); this.socket.emit("requestTermination", (success: boolean) => { this.log.info("requestTermination forwarded to drone", { success }); cb(success); }); } }