From 26e568612ab86131f840e9683c50206c239805e9 Mon Sep 17 00:00:00 2001 From: Rob Colbert Date: Mon, 11 May 2026 20:27:24 -0400 Subject: [PATCH] ChatSession reconnect logic --- docs/socket-protocol.md | 100 +++++++ gadget-code/frontend/src/lib/socket.ts | 5 + .../frontend/src/pages/ChatSessionView.tsx | 92 ++++-- gadget-code/src/lib/code-session.ts | 129 ++++++++- gadget-code/src/lib/drone-session.ts | 266 ++++++++++++++++-- gadget-code/src/lib/message-queue.ts | 85 ++++++ gadget-code/src/lib/tab-lock.ts | 134 +++++++++ gadget-code/src/services/socket.ts | 13 + packages/api/src/messages/socket.ts | 1 + 9 files changed, 772 insertions(+), 53 deletions(-) create mode 100644 gadget-code/src/lib/message-queue.ts create mode 100644 gadget-code/src/lib/tab-lock.ts diff --git a/docs/socket-protocol.md b/docs/socket-protocol.md index e230c47..e7b6140 100644 --- a/docs/socket-protocol.md +++ b/docs/socket-protocol.md @@ -203,3 +203,103 @@ To add a new message: 4. Implement the sender (emit) in the Client (`ide` or `drone`) or Server (`CodeSession`/`DroneSession`). 5. Implement the handler in the corresponding class or frontend component. 6. Implement the forward-path routing if needed. + +--- + +## 8. Reconnection & Message Queuing + +### 8.1 Problem Statement + +When the browser refreshes during work order processing: +1. Old `CodeSession` disconnects, but `DroneSession` continues routing to it +2. Drone emits events but they go to a disconnected socket +3. New `CodeSession` connects but isn't linked to the active chat session +4. Messages are lost; IDE never receives streaming updates + +### 8.2 Solution Architecture + +**Three-phase approach:** + +1. **Redis Message Queue** (`src/lib/message-queue.ts`) + - Messages enqueued when routing fails (disconnected socket) + - FIFO ordering with RPUSH/LPOP + - 30-minute TTL (1800 seconds) + - Max 1000 messages (drop oldest) + - Aggregates adjacent thinking/response messages during drain + +2. **Redis Tab Lock** (`src/lib/tab-lock.ts`) + - Prevents concurrent tab access to same chat session + - 1-minute timeout (requires heartbeat renewal) + - Includes socket ID and user ID for validation + - Auto-cleanup of stale locks + +3. **Auto-Reconnection** (`CodeSession.checkAndReestablishActiveSession()`) + - On connect, checks for active processing turn in DB + - If found, attempts to acquire tab lock + - On success, re-establishes chat session index + - Drains queued messages from Redis + - Aggregates and delivers messages to client + +### 8.3 Message Queue Flow + +``` +Drone emits thinking() → DroneSession.onThinking() + ↓ +SocketService.getCodeSessionByChatSessionId() throws (disconnected) + ↓ +MessageQueue.enqueue(chatSessionId, { type: 'thinking', args: [...] }) + ↓ +[30 minutes later] Queue expires automatically + OR +[On reconnect] MessageQueue.drain() → aggregateMessages() → deliver +``` + +### 8.4 Tab Lock Flow + +``` +IDE connects → CodeSession.register() + ↓ +checkAndReestablishActiveSession() + ↓ +Find active chat session with processing turn + ↓ +TabLock.acquire(chatSessionId, userId, socketId) + ↓ +Success: Register chat session, drain queue, emit status + OR +Failure: Emit 'tabLockDenied' → IDE navigates away +``` + +### 8.5 Frontend Reconciliation + +The frontend handles reconnection gracefully: + +1. **Load history first** - Fetch chat session and turns from DB +2. **Connect socket** - Establish WebSocket connection +3. **Backend auto-reconnects** - If processing turn found, backend re-establishes +4. **Receive queued messages** - Aggregated messages delivered in order +5. **Handle duplicates** - Frontend merges with existing history + +### 8.6 Single Tab Enforcement + +Only one tab can control a chat session at a time: + +- First tab acquires Redis lock +- Subsequent tabs receive `tabLockDenied` event +- UI shows "Chat session open in another browser tab" +- User must navigate away or close the duplicate tab + +### 8.7 Status Indicators + +The status bar shows connection state: + +- **Connected** (green ●) - Socket connected, receiving messages +- **Connecting** (yellow ●) - Attempting to connect +- **Error** (red ●) - Connection failed +- **Disconnected** (gray ●) - No active connection + +Status messages inform the user: +- "Connecting..." - Initial connection +- "Reconnecting to active session..." - Auto-reconnect in progress +- "Reconnected" - Successfully reconnected +- "Chat session is open in another browser tab" - Tab lock denied diff --git a/gadget-code/frontend/src/lib/socket.ts b/gadget-code/frontend/src/lib/socket.ts index 20d1079..4684f18 100644 --- a/gadget-code/frontend/src/lib/socket.ts +++ b/gadget-code/frontend/src/lib/socket.ts @@ -117,6 +117,7 @@ export interface SocketEvents { }) => void; workspaceModeChanged: (mode: string) => void; sessionUpdated: (updates: Partial) => void; + tabLockDenied: (data: { message: string }) => void; connect: () => void; disconnect: (reason: string) => void; error: (error: Error) => void; @@ -261,6 +262,10 @@ class SocketClient { this.reconnectAttempts++; this.emit("error", error); }); + + this._socket.on("tabLockDenied", (data: { message: string }) => { + this.emit("tabLockDenied", data); + }); } disconnect(): void { diff --git a/gadget-code/frontend/src/pages/ChatSessionView.tsx b/gadget-code/frontend/src/pages/ChatSessionView.tsx index 90b67c8..f635aac 100644 --- a/gadget-code/frontend/src/pages/ChatSessionView.tsx +++ b/gadget-code/frontend/src/pages/ChatSessionView.tsx @@ -70,6 +70,9 @@ export default function ChatSessionView() { const [isEditingName, setIsEditingName] = useState(false); const [editName, setEditName] = useState(''); const [isUpdatingName, setIsUpdatingName] = useState(false); + const [connectionState, setConnectionState] = useState<'disconnected' | 'connecting' | 'connected' | 'error'>('disconnected'); + const [isOtherTab, setIsOtherTab] = useState(false); + const [reconnectAttempts, setReconnectAttempts] = useState(0); const messagesEndRef = useRef(null); const inputRef = useRef(null); @@ -125,32 +128,39 @@ export default function ChatSessionView() { }; }, [session, project]); - // Re-lock on socket reconnect to restore lock on a new CodeSession - const handleSocketReconnect = useCallback(async () => { - if (!sessionRef.current || !projectRef.current) return; - const droneJson = localStorage.getItem('dtp_drone_registration'); - if (!droneJson) return; - try { - const registration = JSON.parse(droneJson); - const success = await socketClient.requestSessionLock( - registration, - projectRef.current, - sessionRef.current, - ); - if (!success) { - console.warn('Failed to re-lock drone after socket reconnect'); - } - } catch (err) { - console.error('Failed to re-lock drone after socket reconnect', err); + // Handle socket reconnection and tab lock + const handleSocketConnect = useCallback(() => { + setConnectionState('connected'); + appContext?.setStatusMessage('Connected'); + + // If we had a processing turn, backend will auto-reconnect + const hasProcessingTurn = turns.some(t => t.status === 'processing'); + if (hasProcessingTurn) { + appContext?.setStatusMessage('Reconnecting to active session...'); } - }, []); + }, [turns, appContext]); - useEffect(() => { - socketClient.on('connect', handleSocketReconnect); - return () => { - socketClient.off('connect', handleSocketReconnect); - }; - }, [handleSocketReconnect]); + const handleTabLockDenied = useCallback((data: { message: string }) => { + setIsOtherTab(true); + setConnectionState('error'); + appContext?.setStatusMessage(data.message); + }, [appContext]); + + const handleReconnectAttempt = useCallback(() => { + setReconnectAttempts(prev => prev + 1); + appContext?.setStatusMessage(`Reconnecting... (${reconnectAttempts + 1})`); + }, [reconnectAttempts, appContext]); + + const handleReconnectFailed = useCallback(() => { + setConnectionState('error'); + appContext?.setStatusMessage('Reconnection failed'); + }, [appContext]); + + const handleReconnect = useCallback(() => { + setConnectionState('connected'); + setReconnectAttempts(0); + appContext?.setStatusMessage('Reconnected'); + }, [appContext]); // Release session lock on unmount only useEffect(() => { @@ -197,9 +207,14 @@ export default function ChatSessionView() { setSelectedProviderId(providerId || ''); setSelectedModelId(sessionData.selectedModel || ''); setSessionReasoningEffort(sessionData.reasoningEffort || 'off'); + + // Set connection state to connecting - will update on socket connect + setConnectionState('connecting'); + appContext?.setStatusMessage('Connecting...'); } } catch (err) { setError(err instanceof Error ? err.message : 'Failed to load session'); + setConnectionState('error'); } finally { setLoading(false); } @@ -223,6 +238,11 @@ export default function ChatSessionView() { socketClient.on('agent:tool-call', handleAgentToolCall); socketClient.on('agent:tool-result', handleAgentToolResult); socketClient.on('agent:complete', handleAgentComplete); + socketClient.on('connect', handleSocketConnect); + socketClient.on('tabLockDenied', handleTabLockDenied); + socketClient.on('reconnect_attempt', handleReconnectAttempt); + socketClient.on('reconnect_failed', handleReconnectFailed); + socketClient.on('reconnect', handleReconnect); }; const cleanupSocketListeners = () => { @@ -239,6 +259,11 @@ export default function ChatSessionView() { socketClient.off('agent:tool-call', handleAgentToolCall); socketClient.off('agent:tool-result', handleAgentToolResult); socketClient.off('agent:complete', handleAgentComplete); + socketClient.off('connect', handleSocketConnect); + socketClient.off('tabLockDenied', handleTabLockDenied); + socketClient.off('reconnect_attempt', handleReconnectAttempt); + socketClient.off('reconnect_failed', handleReconnectFailed); + socketClient.off('reconnect', handleReconnect); }; const scheduleUpdate = useCallback(() => { @@ -948,6 +973,25 @@ export default function ChatSessionView() { ); } + // Render "other tab" state + if (isOtherTab) { + return ( +
+
+

+ This chat session is open in another browser tab. +

+ +
+
+ ); + } + return (
{/* Toast notification */} diff --git a/gadget-code/src/lib/code-session.ts b/gadget-code/src/lib/code-session.ts index a2974a7..eb96d06 100644 --- a/gadget-code/src/lib/code-session.ts +++ b/gadget-code/src/lib/code-session.ts @@ -22,8 +22,11 @@ import { } from "@gadget/api"; import ChatSession from "../models/chat-session.ts"; +import DroneRegistration from "../models/drone-registration.ts"; +import { ChatTurn } from "../models/chat-turn.ts"; import { ChatSessionService, SocketService } from "../services/index.ts"; +import TabLock from "./tab-lock.js"; export class CodeSession extends SocketSession { protected type: SocketSessionType = SocketSessionType.Code; @@ -33,6 +36,10 @@ export class CodeSession extends SocketSession { protected selectedDrone: IDroneRegistration | undefined; protected currentTurnId: GadgetId | undefined; protected workspaceMode: WorkspaceMode = WorkspaceMode.Idle; + + private chatSessionId: GadgetId | undefined; + private isReconnecting = false; + private tabLockAcquired = false; constructor(socket: GadgetSocket, user: IUser) { super(socket, user); @@ -41,6 +48,7 @@ export class CodeSession extends SocketSession { register() { super.register(); + this.socket.on("disconnect", this.onDisconnect.bind(this)); this.socket.on("requestSessionLock", this.onRequestSessionLock.bind(this)); this.socket.on( "requestWorkspaceMode", @@ -49,6 +57,17 @@ export class CodeSession extends SocketSession { this.socket.on("submitPrompt", this.onSubmitPrompt.bind(this)); this.socket.on("releaseSessionLock", this.onReleaseSessionLock.bind(this)); this.socket.on("sessionHeartbeat", this.onSessionHeartbeat.bind(this)); + + // Check for active session on connect + this.checkAndReestablishActiveSession(); + } + + private async onDisconnect(): Promise { + // Release tab lock on disconnect + if (this.chatSessionId && this.tabLockAcquired) { + await TabLock.release(this.chatSessionId, this.socket.id); + this.tabLockAcquired = false; + } } get hasLock(): boolean { @@ -67,6 +86,94 @@ export class CodeSession extends SocketSession { return this.project; } + private async checkAndReestablishActiveSession(): Promise { + if (this.isReconnecting) return; + this.isReconnecting = true; + + try { + // Get user's most recent chat session + const recentSessions = await ChatSession.find({ user: this.user._id }) + .sort({ createdAt: -1 }) + .limit(5); + + for (const session of recentSessions) { + // Check if this session has a processing turn + const latestTurn = await ChatTurn.findOne({ session: session._id }) + .sort({ createdAt: -1 }); + + if (latestTurn && latestTurn.status === ChatTurnStatus.Processing) { + // Found active session - attempt to reestablish connection + this.chatSessionId = session._id; + + // Get the drone that was processing this turn + const droneReg = await DroneRegistration.findOne({ + chatSessionId: session._id, + }).populate('user'); + + if (droneReg && droneReg.user) { + await this.autoRelock(droneReg, session); + } + break; + } + } + } catch (error) { + this.log.error("failed to check for active session", { error }); + } finally { + this.isReconnecting = false; + } + } + + private async autoRelock( + registration: IDroneRegistration, + session: IChatSession + ): Promise { + try { + // Try to acquire tab lock + const lockResult = await TabLock.acquire( + session._id, + this.user._id, + this.socket.id, + ); + + if (!lockResult.success) { + this.log.warn("tab lock denied - session open in another tab", { + chatSessionId: session._id, + lockedBy: lockResult.info?.socketId, + }); + this.socket.emit("tabLockDenied", { + message: "Chat session is open in another browser tab", + }); + return; + } + + this.tabLockAcquired = true; + + const droneSession = SocketService.getDroneSession(registration); + + // Re-establish the chat session index + SocketService.registerChatSession(session._id, this); + droneSession.setChatSessionId(session._id); + + // Update CodeSession state + this.chatSession = session; + this.selectedDrone = registration; + + // Drain any queued messages + await droneSession.drainMessageQueue(); + + this.log.info("auto-reestablished session connection", { + chatSessionId: session._id, + droneId: registration._id, + }); + + // Emit status to client + this.socket.emit("status", "Reconnected to active session"); + } catch (error) { + this.log.error("failed to auto-relock session", { error }); + this.socket.emit("status", "Failed to reconnect to session"); + } + } + /** * Sets the selected drone for this code session. */ @@ -104,13 +211,24 @@ export class CodeSession extends SocketSession { registration, project, chatSession, - (success: boolean, chatSessionId: string): void => { + async (success: boolean, chatSessionId: string): Promise => { if (success) { this.selectedDrone = registration; this.chatSession = chatSession; this.project = project; + this.chatSessionId = chatSession._id; + SocketService.registerChatSession(chatSession._id, this); droneSession.setChatSessionId(chatSession._id); + + // Acquire tab lock + const lockResult = await TabLock.acquire( + chatSession._id, + this.user._id, + this.socket.id, + ); + + this.tabLockAcquired = lockResult.success; } cb(success, chatSessionId); }, @@ -300,13 +418,20 @@ export class CodeSession extends SocketSession { registration, project, chatSession, - (success: boolean) => { + async (success: boolean) => { if (success) { SocketService.unregisterChatSession(chatSession._id); droneSession.chatSessionId = undefined; this.selectedDrone = undefined; this.chatSession = undefined; this.project = undefined; + this.chatSessionId = undefined; + + // Release tab lock + if (this.tabLockAcquired) { + await TabLock.release(chatSession._id, this.socket.id); + this.tabLockAcquired = false; + } } cb(success); }, diff --git a/gadget-code/src/lib/drone-session.ts b/gadget-code/src/lib/drone-session.ts index 1db9a79..bf90285 100644 --- a/gadget-code/src/lib/drone-session.ts +++ b/gadget-code/src/lib/drone-session.ts @@ -20,6 +20,7 @@ import { } from "./socket-session.js"; import { SocketService } from "../services/index.js"; import { ChatTurn } from "../models/chat-turn.js"; +import MessageQueue, { type QueuedMessage } from "./message-queue.js"; interface IStreamingBuffer { currentMode: 'thinking' | 'responding' | null; @@ -35,6 +36,7 @@ export class DroneSession extends SocketSession { currentTurnId: GadgetId | undefined; workspaceMode: WorkspaceMode = WorkspaceMode.Idle; private streamingBuffers: Map = new Map(); + private isDrainingQueue = false; constructor(socket: GadgetSocket, registration: IDroneRegistration) { super(socket, registration.user as IUser); @@ -80,32 +82,45 @@ export class DroneSession extends SocketSession { metadata?: unknown, ): Promise { if (!this.chatSessionId) { + this.log.warn("log event received but no chat session is active"); return; } + try { const codeSession = SocketService.getCodeSessionByChatSessionId( this.chatSessionId, ); codeSession.onLog(timestamp, component, level, message, metadata); } catch (error) { - this.log.error("failed to route log message", { error }); + // Routing failed - queue to Redis + await MessageQueue.enqueue(this.chatSessionId, { + type: 'log', + args: [timestamp, component, level, message, metadata], + timestamp: Date.now(), + }); + this.log.debug("queued log message", { chatSessionId: this.chatSessionId }); } } async onStatus(message: string): Promise { if (!this.chatSessionId) { - this.log.warn( - "drone status event received but no chat session is active", - ); + this.log.warn("status event received but no chat session is active"); return; } + try { const codeSession = SocketService.getCodeSessionByChatSessionId( this.chatSessionId, ); codeSession.socket.emit("status", message); } catch (error) { - this.log.error("failed to route status message", { error }); + // Routing failed - queue to Redis + await MessageQueue.enqueue(this.chatSessionId, { + type: 'status', + args: [message], + timestamp: Date.now(), + }); + this.log.debug("queued status message", { chatSessionId: this.chatSessionId }); } } @@ -141,7 +156,13 @@ export class DroneSession extends SocketSession { buffer.thinkingContent += content; } } catch (error) { - this.log.error("failed to route thinking event", { error }); + // Routing failed - queue to Redis + await MessageQueue.enqueue(this.chatSessionId, { + type: 'thinking', + args: [content], + timestamp: Date.now(), + }); + this.log.debug("queued thinking message", { chatSessionId: this.chatSessionId }); } } @@ -177,7 +198,13 @@ export class DroneSession extends SocketSession { buffer.respondingContent += content; } } catch (error) { - this.log.error("failed to route response event", { error }); + // Routing failed - queue to Redis + await MessageQueue.enqueue(this.chatSessionId, { + type: 'response', + args: [content], + timestamp: Date.now(), + }); + this.log.debug("queued response message", { chatSessionId: this.chatSessionId }); } } @@ -230,7 +257,13 @@ export class DroneSession extends SocketSession { } } } catch (error) { - this.log.error("failed to route toolCall event", { error }); + // Routing failed - queue to Redis + await MessageQueue.enqueue(this.chatSessionId, { + type: 'toolCall', + args: [callId, name, params, response], + timestamp: Date.now(), + }); + this.log.debug("queued toolCall message", { chatSessionId: this.chatSessionId }); } } @@ -244,9 +277,7 @@ export class DroneSession extends SocketSession { message?: string, ): Promise { if (!this.chatSessionId) { - this.log.warn( - "workOrderComplete event received but no chat session is active", - ); + this.log.warn("workOrderComplete event received but no chat session is active"); return; } @@ -271,7 +302,13 @@ export class DroneSession extends SocketSession { this.currentTurnId = undefined; } catch (error) { - this.log.error("failed to process workOrderComplete event", { error }); + // Routing failed - queue to Redis + await MessageQueue.enqueue(this.chatSessionId, { + type: 'workOrderComplete', + args: [turnId, success, message], + timestamp: Date.now(), + }); + this.log.debug("queued workOrderComplete message", { chatSessionId: this.chatSessionId }); } } @@ -345,66 +382,110 @@ export class DroneSession extends SocketSession { */ async onWorkspaceModeChanged(mode: WorkspaceMode): Promise { if (!this.chatSessionId) { + this.log.warn("workspaceModeChanged event received but no chat session is active"); return; } this.workspaceMode = mode; this.log.info("workspace mode changed", { mode }); - const codeSession = SocketService.getCodeSessionByChatSessionId( - this.chatSessionId, - ); - - codeSession.onWorkspaceModeChanged(mode); + try { + const codeSession = SocketService.getCodeSessionByChatSessionId( + this.chatSessionId, + ); + codeSession.onWorkspaceModeChanged(mode); + } catch (error) { + this.log.error("failed to route workspaceModeChanged event", { error }); + } } async onAgentThinking(data: { agentId: string; thinking: string }): Promise { - if (!this.chatSessionId) return; + if (!this.chatSessionId) { + this.log.warn("agent:thinking event received but no chat session is active"); + return; + } try { const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId); codeSession.socket.emit("agent:thinking", data); } catch (error) { - this.log.error("failed to route agent:thinking", { error }); + await MessageQueue.enqueue(this.chatSessionId, { + type: 'agent:thinking', + args: [data], + timestamp: Date.now(), + }); + this.log.debug("queued agent:thinking message", { chatSessionId: this.chatSessionId }); } } async onAgentResponse(data: { agentId: string; chunk: string }): Promise { - if (!this.chatSessionId) return; + if (!this.chatSessionId) { + this.log.warn("agent:response event received but no chat session is active"); + return; + } try { const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId); codeSession.socket.emit("agent:response", data); } catch (error) { - this.log.error("failed to route agent:response", { error }); + await MessageQueue.enqueue(this.chatSessionId, { + type: 'agent:response', + args: [data], + timestamp: Date.now(), + }); + this.log.debug("queued agent:response message", { chatSessionId: this.chatSessionId }); } } async onAgentToolCall(data: { agentId: string; tool: string; args: unknown }): Promise { - if (!this.chatSessionId) return; + if (!this.chatSessionId) { + this.log.warn("agent:tool-call event received but no chat session is active"); + return; + } try { const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId); codeSession.socket.emit("agent:tool-call", data); } catch (error) { - this.log.error("failed to route agent:tool-call", { error }); + await MessageQueue.enqueue(this.chatSessionId, { + type: 'agent:tool-call', + args: [data], + timestamp: Date.now(), + }); + this.log.debug("queued agent:tool-call message", { chatSessionId: this.chatSessionId }); } } async onAgentToolResult(data: { agentId: string; tool: string; result: unknown }): Promise { - if (!this.chatSessionId) return; + if (!this.chatSessionId) { + this.log.warn("agent:tool-result event received but no chat session is active"); + return; + } try { const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId); codeSession.socket.emit("agent:tool-result", data); } catch (error) { - this.log.error("failed to route agent:tool-result", { error }); + await MessageQueue.enqueue(this.chatSessionId, { + type: 'agent:tool-result', + args: [data], + timestamp: Date.now(), + }); + this.log.debug("queued agent:tool-result message", { chatSessionId: this.chatSessionId }); } } async onAgentComplete(data: { agentId: string; response?: string; subagent?: Record; stats?: Record }): Promise { - if (!this.chatSessionId) return; + if (!this.chatSessionId) { + this.log.warn("agent:complete event received but no chat session is active"); + return; + } try { const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId); codeSession.socket.emit("agent:complete", data); } catch (error) { - this.log.error("failed to route agent:complete to frontend", { error }); + await MessageQueue.enqueue(this.chatSessionId, { + type: 'agent:complete', + args: [data], + timestamp: Date.now(), + }); + this.log.debug("queued agent:complete message", { chatSessionId: this.chatSessionId }); } // Update the persisted tool call with the final response and subagent data @@ -524,4 +605,135 @@ export class DroneSession extends SocketSession { cb(success); }); } + + /** + * Drains queued messages from Redis and delivers to reconnected CodeSession. + * Aggregates adjacent same-type streaming messages to reduce message count. + */ + async drainMessageQueue(): Promise { + if (!this.chatSessionId || this.isDrainingQueue) return; + this.isDrainingQueue = true; + + try { + const messages = await MessageQueue.drain(this.chatSessionId); + + if (messages.length === 0) return; + + this.log.info("draining message queue", { count: messages.length }); + + const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId); + + // Aggregate adjacent same-type streaming messages + const aggregated = this.aggregateMessages(messages); + + for (const msg of aggregated) { + try { + switch (msg.type) { + case 'thinking': + codeSession.onThinking(msg.args[0] as string); + break; + case 'response': + codeSession.onResponse(msg.args[0] as string); + break; + case 'toolCall': + codeSession.onToolCall( + msg.args[0] as string, + msg.args[1] as string, + msg.args[2] as string, + msg.args[3] as string, + ); + break; + case 'workOrderComplete': + codeSession.onWorkOrderComplete( + msg.args[0] as string, + msg.args[1] as boolean, + msg.args[2] as string, + ); + break; + case 'log': + codeSession.onLog( + msg.args[0] as Date, + msg.args[1] as GadgetComponent, + msg.args[2] as GadgetLogLevel, + msg.args[3] as string, + msg.args[4] as unknown, + ); + break; + case 'status': + codeSession.socket.emit("status", msg.args[0] as string); + break; + case 'agent:thinking': + codeSession.socket.emit("agent:thinking", msg.args[0] as { agentId: string; thinking: string }); + break; + case 'agent:response': + codeSession.socket.emit("agent:response", msg.args[0] as { agentId: string; chunk: string }); + break; + case 'agent:tool-call': + codeSession.socket.emit("agent:tool-call", msg.args[0] as { agentId: string; tool: string; args: unknown }); + break; + case 'agent:tool-result': + codeSession.socket.emit("agent:tool-result", msg.args[0] as { agentId: string; tool: string; result: unknown }); + break; + case 'agent:complete': + codeSession.socket.emit("agent:complete", msg.args[0] as { agentId: string; response?: string; subagent?: Record; stats?: Record }); + break; + default: + this.log.warn("unknown queued message type", { type: (msg as any).type }); + } + // Small delay to avoid flooding + await new Promise(resolve => setTimeout(resolve, 5)); + } catch (error) { + this.log.error("failed to deliver queued message", { + type: msg.type, + error, + }); + } + } + } catch (error) { + this.log.error("failed to drain message queue", { error }); + } finally { + this.isDrainingQueue = false; + } + } + + /** + * Aggregates adjacent same-type streaming messages (thinking/response). + * Preserves order and only aggregates during drain (not real-time). + */ + private aggregateMessages(messages: QueuedMessage[]): QueuedMessage[] { + const aggregated: QueuedMessage[] = []; + let currentAggregate: QueuedMessage | null = null; + + for (const msg of messages) { + // Only aggregate thinking and response messages + if (msg.type === 'thinking' || msg.type === 'response') { + if (currentAggregate && currentAggregate.type === msg.type) { + // Continue aggregating same type + const currentContent = (currentAggregate.args[0] as string) || ''; + const newContent = (msg.args[0] as string) || ''; + currentAggregate.args = [currentContent + newContent]; + } else { + // Type changed - push current aggregate and start new + if (currentAggregate) { + aggregated.push(currentAggregate); + } + currentAggregate = { ...msg }; + } + } else { + // Non-aggregatable message type + if (currentAggregate) { + aggregated.push(currentAggregate); + currentAggregate = null; + } + aggregated.push(msg); + } + } + + // Push final aggregate if exists + if (currentAggregate) { + aggregated.push(currentAggregate); + } + + return aggregated; + } } diff --git a/gadget-code/src/lib/message-queue.ts b/gadget-code/src/lib/message-queue.ts new file mode 100644 index 0000000..258adb0 --- /dev/null +++ b/gadget-code/src/lib/message-queue.ts @@ -0,0 +1,85 @@ +// src/lib/message-queue.ts +// Copyright (C) 2026 Robert Colbert +// All Rights Reserved + +import redis from './redis.js'; +import { GadgetLog } from '@gadget/api'; + +const log = new GadgetLog({ name: 'MessageQueue', slug: 'message-queue' }); + +export interface QueuedMessage { + type: 'thinking' | 'response' | 'toolCall' | 'workOrderComplete' | + 'log' | 'status' | 'agent:thinking' | 'agent:response' | + 'agent:tool-call' | 'agent:tool-result' | 'agent:complete'; + args: unknown[]; + timestamp: number; +} + +const QUEUE_KEY = (chatSessionId: string) => `gadget:messages:${chatSessionId}`; +const TTL_SECONDS = 1800; // 30 minutes +const MAX_QUEUE_SIZE = 1000; + +class MessageQueueService { + /** + * Enqueues a message to Redis for later delivery. + * Uses RPUSH for FIFO ordering (messages delivered in order). + * Trims queue to MAX_QUEUE_SIZE messages (drop oldest if exceeded). + */ + async enqueue(chatSessionId: string, message: QueuedMessage): Promise { + const key = QUEUE_KEY(chatSessionId); + const serialized = JSON.stringify(message); + + try { + // RPUSH for FIFO ordering + await redis.rpush(key, serialized); + + // Trim to MAX_QUEUE_SIZE messages (drop oldest from left) + await redis.ltrim(key, -MAX_QUEUE_SIZE, -1); + + // Set/refresh TTL + await redis.expire(key, TTL_SECONDS); + } catch (error) { + log.error('failed to enqueue message', { chatSessionId, error }); + throw error; // Drone will stop on error + } + } + + /** + * Drains all queued messages in FIFO order and deletes the queue. + * Returns messages for delivery to reconnected client. + */ + async drain(chatSessionId: string): Promise { + const key = QUEUE_KEY(chatSessionId); + + try { + // Get all messages + const messages = await redis.lrange(key, 0, -1); + + if (messages.length > 0) { + // Delete the queue after draining + await redis.del(key); + + return messages.map(msg => JSON.parse(msg) as QueuedMessage); + } + + return []; + } catch (error) { + log.error('failed to drain message queue', { chatSessionId, error }); + throw error; + } + } + + /** + * Cleans up a message queue (e.g., on session close). + */ + async cleanup(chatSessionId: string): Promise { + const key = QUEUE_KEY(chatSessionId); + try { + await redis.del(key); + } catch (error) { + log.error('failed to cleanup message queue', { chatSessionId, error }); + } + } +} + +export default new MessageQueueService(); diff --git a/gadget-code/src/lib/tab-lock.ts b/gadget-code/src/lib/tab-lock.ts new file mode 100644 index 0000000..347ddc6 --- /dev/null +++ b/gadget-code/src/lib/tab-lock.ts @@ -0,0 +1,134 @@ +// src/lib/tab-lock.ts +// Copyright (C) 2026 Robert Colbert +// All Rights Reserved + +import redis from './redis.js'; +import { GadgetLog } from '@gadget/api'; + +const log = new GadgetLog({ name: 'TabLock', slug: 'tab-lock' }); + +const LOCK_KEY = (chatSessionId: string) => `gadget:lock:${chatSessionId}`; +const LOCK_TIMEOUT_MS = 60000; // 1 minute - must be renewed by heartbeat + +export interface TabLockInfo { + socketId: string; + userId: string; + acquiredAt: number; +} + +class TabLockService { + /** + * Attempts to acquire a tab lock for a chat session. + * Returns success=true if lock acquired, false if already locked by another tab. + */ + async acquire( + chatSessionId: string, + userId: string, + socketId: string + ): Promise<{ success: boolean; info?: TabLockInfo }> { + const key = LOCK_KEY(chatSessionId); + const now = Date.now(); + + try { + // Try to set lock with NX (only if not exists) + const acquired = await redis.set( + key, + JSON.stringify({ socketId, userId, acquiredAt: now }), + 'EX', + LOCK_TIMEOUT_MS / 1000, + 'NX' + ); + + if (acquired === 'OK') { + log.info('tab lock acquired', { chatSessionId, userId, socketId }); + return { success: true }; + } + + // Lock exists - check if it's ours or stale + const existing = await redis.get(key); + if (!existing) { + // Lock disappeared between check and set - retry + return this.acquire(chatSessionId, userId, socketId); + } + + const info = JSON.parse(existing) as TabLockInfo; + + // Check if lock is stale (expired but not cleaned up) + if (now - info.acquiredAt > LOCK_TIMEOUT_MS) { + log.info('stale tab lock detected, reclaiming', { chatSessionId, info }); + await this.release(chatSessionId); + return this.acquire(chatSessionId, userId, socketId); + } + + // Lock is held by another tab + log.info('tab lock denied - already locked', { + chatSessionId, + lockedBy: info.socketId, + ourSocket: socketId + }); + + return { success: false, info }; + } catch (error) { + log.error('failed to acquire tab lock', { chatSessionId, error }); + throw error; + } + } + + /** + * Releases a tab lock. + */ + async release(chatSessionId: string, socketId?: string): Promise { + const key = LOCK_KEY(chatSessionId); + + try { + if (socketId) { + // Only release if we hold the lock + const existing = await redis.get(key); + if (existing) { + const info = JSON.parse(existing) as TabLockInfo; + if (info.socketId === socketId) { + await redis.del(key); + log.info('tab lock released', { chatSessionId, socketId }); + } + } + } else { + await redis.del(key); + } + } catch (error) { + log.error('failed to release tab lock', { chatSessionId, error }); + } + } + + /** + * Refreshes the TTL on an existing lock (heartbeat). + */ + async refresh(chatSessionId: string): Promise { + const key = LOCK_KEY(chatSessionId); + + try { + const refreshed = await redis.expire(key, LOCK_TIMEOUT_MS / 1000); + return refreshed !== 0; + } catch (error) { + log.error('failed to refresh tab lock', { chatSessionId, error }); + return false; + } + } + + /** + * Gets current lock info without modifying. + */ + async getInfo(chatSessionId: string): Promise { + const key = LOCK_KEY(chatSessionId); + + try { + const existing = await redis.get(key); + if (!existing) return null; + return JSON.parse(existing) as TabLockInfo; + } catch (error) { + log.error('failed to get tab lock info', { chatSessionId, error }); + return null; + } + } +} + +export default new TabLockService(); diff --git a/gadget-code/src/services/socket.ts b/gadget-code/src/services/socket.ts index 8f10e22..13ef357 100644 --- a/gadget-code/src/services/socket.ts +++ b/gadget-code/src/services/socket.ts @@ -292,6 +292,19 @@ class SocketService extends DtpService { error.statusCode = 404; throw error; } + + // CRITICAL: Check if socket is still connected + if (!session.socket.connected) { + this.log.warn("code session socket disconnected, clearing stale reference", { + chatSessionId, + socketId: session.socket.id, + }); + this.chatSessionIndex.delete(chatSessionId); + const error = new Error("code session socket disconnected"); + error.statusCode = 404; + throw error; + } + return session; } diff --git a/packages/api/src/messages/socket.ts b/packages/api/src/messages/socket.ts index e17d315..aef7ff9 100644 --- a/packages/api/src/messages/socket.ts +++ b/packages/api/src/messages/socket.ts @@ -127,6 +127,7 @@ export interface ServerToClientEvents { workOrderComplete: WorkOrderCompleteMessage; workspaceModeChanged: WorkspaceModeChangedMessage; sessionUpdated: SessionUpdatedMessage; + tabLockDenied: (data: { message: string }) => void; "agent:thinking": AgentThinkingMessage; "agent:response": AgentResponseMessage; "agent:tool-call": AgentToolCallMessage;