ChatSession reconnect logic

This commit is contained in:
Rob Colbert 2026-05-11 20:27:24 -04:00
parent 01c84ba5a0
commit 26e568612a
9 changed files with 772 additions and 53 deletions

View File

@ -203,3 +203,103 @@ To add a new message:
4. Implement the sender (emit) in the Client (`ide` or `drone`) or Server (`CodeSession`/`DroneSession`).
5. Implement the handler in the corresponding class or frontend component.
6. Implement the forward-path routing if needed.
---
## 8. Reconnection & Message Queuing
### 8.1 Problem Statement
When the browser refreshes during work order processing:
1. Old `CodeSession` disconnects, but `DroneSession` continues routing to it
2. Drone emits events but they go to a disconnected socket
3. New `CodeSession` connects but isn't linked to the active chat session
4. Messages are lost; IDE never receives streaming updates
### 8.2 Solution Architecture
**Three-phase approach:**
1. **Redis Message Queue** (`src/lib/message-queue.ts`)
- Messages enqueued when routing fails (disconnected socket)
- FIFO ordering with RPUSH/LPOP
- 30-minute TTL (1800 seconds)
- Max 1000 messages (drop oldest)
- Aggregates adjacent thinking/response messages during drain
2. **Redis Tab Lock** (`src/lib/tab-lock.ts`)
- Prevents concurrent tab access to same chat session
- 1-minute timeout (requires heartbeat renewal)
- Includes socket ID and user ID for validation
- Auto-cleanup of stale locks
3. **Auto-Reconnection** (`CodeSession.checkAndReestablishActiveSession()`)
- On connect, checks for active processing turn in DB
- If found, attempts to acquire tab lock
- On success, re-establishes chat session index
- Drains queued messages from Redis
- Aggregates and delivers messages to client
### 8.3 Message Queue Flow
```
Drone emits thinking() → DroneSession.onThinking()
SocketService.getCodeSessionByChatSessionId() throws (disconnected)
MessageQueue.enqueue(chatSessionId, { type: 'thinking', args: [...] })
[30 minutes later] Queue expires automatically
OR
[On reconnect] MessageQueue.drain() → aggregateMessages() → deliver
```
### 8.4 Tab Lock Flow
```
IDE connects → CodeSession.register()
checkAndReestablishActiveSession()
Find active chat session with processing turn
TabLock.acquire(chatSessionId, userId, socketId)
Success: Register chat session, drain queue, emit status
OR
Failure: Emit 'tabLockDenied' → IDE navigates away
```
### 8.5 Frontend Reconciliation
The frontend handles reconnection gracefully:
1. **Load history first** - Fetch chat session and turns from DB
2. **Connect socket** - Establish WebSocket connection
3. **Backend auto-reconnects** - If processing turn found, backend re-establishes
4. **Receive queued messages** - Aggregated messages delivered in order
5. **Handle duplicates** - Frontend merges with existing history
### 8.6 Single Tab Enforcement
Only one tab can control a chat session at a time:
- First tab acquires Redis lock
- Subsequent tabs receive `tabLockDenied` event
- UI shows "Chat session open in another browser tab"
- User must navigate away or close the duplicate tab
### 8.7 Status Indicators
The status bar shows connection state:
- **Connected** (green ●) - Socket connected, receiving messages
- **Connecting** (yellow ●) - Attempting to connect
- **Error** (red ●) - Connection failed
- **Disconnected** (gray ●) - No active connection
Status messages inform the user:
- "Connecting..." - Initial connection
- "Reconnecting to active session..." - Auto-reconnect in progress
- "Reconnected" - Successfully reconnected
- "Chat session is open in another browser tab" - Tab lock denied

View File

@ -117,6 +117,7 @@ export interface SocketEvents {
}) => void;
workspaceModeChanged: (mode: string) => void;
sessionUpdated: (updates: Partial<ChatSession>) => void;
tabLockDenied: (data: { message: string }) => void;
connect: () => void;
disconnect: (reason: string) => void;
error: (error: Error) => void;
@ -261,6 +262,10 @@ class SocketClient {
this.reconnectAttempts++;
this.emit("error", error);
});
this._socket.on("tabLockDenied", (data: { message: string }) => {
this.emit("tabLockDenied", data);
});
}
disconnect(): void {

View File

@ -70,6 +70,9 @@ export default function ChatSessionView() {
const [isEditingName, setIsEditingName] = useState(false);
const [editName, setEditName] = useState('');
const [isUpdatingName, setIsUpdatingName] = useState(false);
const [connectionState, setConnectionState] = useState<'disconnected' | 'connecting' | 'connected' | 'error'>('disconnected');
const [isOtherTab, setIsOtherTab] = useState(false);
const [reconnectAttempts, setReconnectAttempts] = useState(0);
const messagesEndRef = useRef<HTMLDivElement>(null);
const inputRef = useRef<HTMLTextAreaElement>(null);
@ -125,32 +128,39 @@ export default function ChatSessionView() {
};
}, [session, project]);
// Re-lock on socket reconnect to restore lock on a new CodeSession
const handleSocketReconnect = useCallback(async () => {
if (!sessionRef.current || !projectRef.current) return;
const droneJson = localStorage.getItem('dtp_drone_registration');
if (!droneJson) return;
try {
const registration = JSON.parse(droneJson);
const success = await socketClient.requestSessionLock(
registration,
projectRef.current,
sessionRef.current,
);
if (!success) {
console.warn('Failed to re-lock drone after socket reconnect');
}
} catch (err) {
console.error('Failed to re-lock drone after socket reconnect', err);
// Handle socket reconnection and tab lock
const handleSocketConnect = useCallback(() => {
setConnectionState('connected');
appContext?.setStatusMessage('Connected');
// If we had a processing turn, backend will auto-reconnect
const hasProcessingTurn = turns.some(t => t.status === 'processing');
if (hasProcessingTurn) {
appContext?.setStatusMessage('Reconnecting to active session...');
}
}, []);
}, [turns, appContext]);
useEffect(() => {
socketClient.on('connect', handleSocketReconnect);
return () => {
socketClient.off('connect', handleSocketReconnect);
};
}, [handleSocketReconnect]);
const handleTabLockDenied = useCallback((data: { message: string }) => {
setIsOtherTab(true);
setConnectionState('error');
appContext?.setStatusMessage(data.message);
}, [appContext]);
const handleReconnectAttempt = useCallback(() => {
setReconnectAttempts(prev => prev + 1);
appContext?.setStatusMessage(`Reconnecting... (${reconnectAttempts + 1})`);
}, [reconnectAttempts, appContext]);
const handleReconnectFailed = useCallback(() => {
setConnectionState('error');
appContext?.setStatusMessage('Reconnection failed');
}, [appContext]);
const handleReconnect = useCallback(() => {
setConnectionState('connected');
setReconnectAttempts(0);
appContext?.setStatusMessage('Reconnected');
}, [appContext]);
// Release session lock on unmount only
useEffect(() => {
@ -197,9 +207,14 @@ export default function ChatSessionView() {
setSelectedProviderId(providerId || '');
setSelectedModelId(sessionData.selectedModel || '');
setSessionReasoningEffort(sessionData.reasoningEffort || 'off');
// Set connection state to connecting - will update on socket connect
setConnectionState('connecting');
appContext?.setStatusMessage('Connecting...');
}
} catch (err) {
setError(err instanceof Error ? err.message : 'Failed to load session');
setConnectionState('error');
} finally {
setLoading(false);
}
@ -223,6 +238,11 @@ export default function ChatSessionView() {
socketClient.on('agent:tool-call', handleAgentToolCall);
socketClient.on('agent:tool-result', handleAgentToolResult);
socketClient.on('agent:complete', handleAgentComplete);
socketClient.on('connect', handleSocketConnect);
socketClient.on('tabLockDenied', handleTabLockDenied);
socketClient.on('reconnect_attempt', handleReconnectAttempt);
socketClient.on('reconnect_failed', handleReconnectFailed);
socketClient.on('reconnect', handleReconnect);
};
const cleanupSocketListeners = () => {
@ -239,6 +259,11 @@ export default function ChatSessionView() {
socketClient.off('agent:tool-call', handleAgentToolCall);
socketClient.off('agent:tool-result', handleAgentToolResult);
socketClient.off('agent:complete', handleAgentComplete);
socketClient.off('connect', handleSocketConnect);
socketClient.off('tabLockDenied', handleTabLockDenied);
socketClient.off('reconnect_attempt', handleReconnectAttempt);
socketClient.off('reconnect_failed', handleReconnectFailed);
socketClient.off('reconnect', handleReconnect);
};
const scheduleUpdate = useCallback(() => {
@ -948,6 +973,25 @@ export default function ChatSessionView() {
);
}
// Render "other tab" state
if (isOtherTab) {
return (
<div className="flex-1 flex items-center justify-center bg-bg-primary">
<div className="text-center">
<p className="text-text-secondary mb-4">
This chat session is open in another browser tab.
</p>
<button
onClick={() => navigate(`/projects/${projectId}`)}
className="px-4 py-2 bg-brand text-white rounded hover:bg-red-700 transition-colors"
>
Back to Project
</button>
</div>
</div>
);
}
return (
<div className="flex-1 flex bg-bg-primary overflow-hidden relative">
{/* Toast notification */}

View File

@ -22,8 +22,11 @@ import {
} from "@gadget/api";
import ChatSession from "../models/chat-session.ts";
import DroneRegistration from "../models/drone-registration.ts";
import { ChatTurn } from "../models/chat-turn.ts";
import { ChatSessionService, SocketService } from "../services/index.ts";
import TabLock from "./tab-lock.js";
export class CodeSession extends SocketSession {
protected type: SocketSessionType = SocketSessionType.Code;
@ -33,6 +36,10 @@ export class CodeSession extends SocketSession {
protected selectedDrone: IDroneRegistration | undefined;
protected currentTurnId: GadgetId | undefined;
protected workspaceMode: WorkspaceMode = WorkspaceMode.Idle;
private chatSessionId: GadgetId | undefined;
private isReconnecting = false;
private tabLockAcquired = false;
constructor(socket: GadgetSocket, user: IUser) {
super(socket, user);
@ -41,6 +48,7 @@ export class CodeSession extends SocketSession {
register() {
super.register();
this.socket.on("disconnect", this.onDisconnect.bind(this));
this.socket.on("requestSessionLock", this.onRequestSessionLock.bind(this));
this.socket.on(
"requestWorkspaceMode",
@ -49,6 +57,17 @@ export class CodeSession extends SocketSession {
this.socket.on("submitPrompt", this.onSubmitPrompt.bind(this));
this.socket.on("releaseSessionLock", this.onReleaseSessionLock.bind(this));
this.socket.on("sessionHeartbeat", this.onSessionHeartbeat.bind(this));
// Check for active session on connect
this.checkAndReestablishActiveSession();
}
private async onDisconnect(): Promise<void> {
// Release tab lock on disconnect
if (this.chatSessionId && this.tabLockAcquired) {
await TabLock.release(this.chatSessionId, this.socket.id);
this.tabLockAcquired = false;
}
}
get hasLock(): boolean {
@ -67,6 +86,94 @@ export class CodeSession extends SocketSession {
return this.project;
}
private async checkAndReestablishActiveSession(): Promise<void> {
if (this.isReconnecting) return;
this.isReconnecting = true;
try {
// Get user's most recent chat session
const recentSessions = await ChatSession.find({ user: this.user._id })
.sort({ createdAt: -1 })
.limit(5);
for (const session of recentSessions) {
// Check if this session has a processing turn
const latestTurn = await ChatTurn.findOne({ session: session._id })
.sort({ createdAt: -1 });
if (latestTurn && latestTurn.status === ChatTurnStatus.Processing) {
// Found active session - attempt to reestablish connection
this.chatSessionId = session._id;
// Get the drone that was processing this turn
const droneReg = await DroneRegistration.findOne({
chatSessionId: session._id,
}).populate('user');
if (droneReg && droneReg.user) {
await this.autoRelock(droneReg, session);
}
break;
}
}
} catch (error) {
this.log.error("failed to check for active session", { error });
} finally {
this.isReconnecting = false;
}
}
private async autoRelock(
registration: IDroneRegistration,
session: IChatSession
): Promise<void> {
try {
// Try to acquire tab lock
const lockResult = await TabLock.acquire(
session._id,
this.user._id,
this.socket.id,
);
if (!lockResult.success) {
this.log.warn("tab lock denied - session open in another tab", {
chatSessionId: session._id,
lockedBy: lockResult.info?.socketId,
});
this.socket.emit("tabLockDenied", {
message: "Chat session is open in another browser tab",
});
return;
}
this.tabLockAcquired = true;
const droneSession = SocketService.getDroneSession(registration);
// Re-establish the chat session index
SocketService.registerChatSession(session._id, this);
droneSession.setChatSessionId(session._id);
// Update CodeSession state
this.chatSession = session;
this.selectedDrone = registration;
// Drain any queued messages
await droneSession.drainMessageQueue();
this.log.info("auto-reestablished session connection", {
chatSessionId: session._id,
droneId: registration._id,
});
// Emit status to client
this.socket.emit("status", "Reconnected to active session");
} catch (error) {
this.log.error("failed to auto-relock session", { error });
this.socket.emit("status", "Failed to reconnect to session");
}
}
/**
* Sets the selected drone for this code session.
*/
@ -104,13 +211,24 @@ export class CodeSession extends SocketSession {
registration,
project,
chatSession,
(success: boolean, chatSessionId: string): void => {
async (success: boolean, chatSessionId: string): Promise<void> => {
if (success) {
this.selectedDrone = registration;
this.chatSession = chatSession;
this.project = project;
this.chatSessionId = chatSession._id;
SocketService.registerChatSession(chatSession._id, this);
droneSession.setChatSessionId(chatSession._id);
// Acquire tab lock
const lockResult = await TabLock.acquire(
chatSession._id,
this.user._id,
this.socket.id,
);
this.tabLockAcquired = lockResult.success;
}
cb(success, chatSessionId);
},
@ -300,13 +418,20 @@ export class CodeSession extends SocketSession {
registration,
project,
chatSession,
(success: boolean) => {
async (success: boolean) => {
if (success) {
SocketService.unregisterChatSession(chatSession._id);
droneSession.chatSessionId = undefined;
this.selectedDrone = undefined;
this.chatSession = undefined;
this.project = undefined;
this.chatSessionId = undefined;
// Release tab lock
if (this.tabLockAcquired) {
await TabLock.release(chatSession._id, this.socket.id);
this.tabLockAcquired = false;
}
}
cb(success);
},

View File

@ -20,6 +20,7 @@ import {
} from "./socket-session.js";
import { SocketService } from "../services/index.js";
import { ChatTurn } from "../models/chat-turn.js";
import MessageQueue, { type QueuedMessage } from "./message-queue.js";
interface IStreamingBuffer {
currentMode: 'thinking' | 'responding' | null;
@ -35,6 +36,7 @@ export class DroneSession extends SocketSession {
currentTurnId: GadgetId | undefined;
workspaceMode: WorkspaceMode = WorkspaceMode.Idle;
private streamingBuffers: Map<string, IStreamingBuffer> = new Map();
private isDrainingQueue = false;
constructor(socket: GadgetSocket, registration: IDroneRegistration) {
super(socket, registration.user as IUser);
@ -80,32 +82,45 @@ export class DroneSession extends SocketSession {
metadata?: unknown,
): Promise<void> {
if (!this.chatSessionId) {
this.log.warn("log event received but no chat session is active");
return;
}
try {
const codeSession = SocketService.getCodeSessionByChatSessionId(
this.chatSessionId,
);
codeSession.onLog(timestamp, component, level, message, metadata);
} catch (error) {
this.log.error("failed to route log message", { error });
// Routing failed - queue to Redis
await MessageQueue.enqueue(this.chatSessionId, {
type: 'log',
args: [timestamp, component, level, message, metadata],
timestamp: Date.now(),
});
this.log.debug("queued log message", { chatSessionId: this.chatSessionId });
}
}
async onStatus(message: string): Promise<void> {
if (!this.chatSessionId) {
this.log.warn(
"drone status event received but no chat session is active",
);
this.log.warn("status event received but no chat session is active");
return;
}
try {
const codeSession = SocketService.getCodeSessionByChatSessionId(
this.chatSessionId,
);
codeSession.socket.emit("status", message);
} catch (error) {
this.log.error("failed to route status message", { error });
// Routing failed - queue to Redis
await MessageQueue.enqueue(this.chatSessionId, {
type: 'status',
args: [message],
timestamp: Date.now(),
});
this.log.debug("queued status message", { chatSessionId: this.chatSessionId });
}
}
@ -141,7 +156,13 @@ export class DroneSession extends SocketSession {
buffer.thinkingContent += content;
}
} catch (error) {
this.log.error("failed to route thinking event", { error });
// Routing failed - queue to Redis
await MessageQueue.enqueue(this.chatSessionId, {
type: 'thinking',
args: [content],
timestamp: Date.now(),
});
this.log.debug("queued thinking message", { chatSessionId: this.chatSessionId });
}
}
@ -177,7 +198,13 @@ export class DroneSession extends SocketSession {
buffer.respondingContent += content;
}
} catch (error) {
this.log.error("failed to route response event", { error });
// Routing failed - queue to Redis
await MessageQueue.enqueue(this.chatSessionId, {
type: 'response',
args: [content],
timestamp: Date.now(),
});
this.log.debug("queued response message", { chatSessionId: this.chatSessionId });
}
}
@ -230,7 +257,13 @@ export class DroneSession extends SocketSession {
}
}
} catch (error) {
this.log.error("failed to route toolCall event", { error });
// Routing failed - queue to Redis
await MessageQueue.enqueue(this.chatSessionId, {
type: 'toolCall',
args: [callId, name, params, response],
timestamp: Date.now(),
});
this.log.debug("queued toolCall message", { chatSessionId: this.chatSessionId });
}
}
@ -244,9 +277,7 @@ export class DroneSession extends SocketSession {
message?: string,
): Promise<void> {
if (!this.chatSessionId) {
this.log.warn(
"workOrderComplete event received but no chat session is active",
);
this.log.warn("workOrderComplete event received but no chat session is active");
return;
}
@ -271,7 +302,13 @@ export class DroneSession extends SocketSession {
this.currentTurnId = undefined;
} catch (error) {
this.log.error("failed to process workOrderComplete event", { error });
// Routing failed - queue to Redis
await MessageQueue.enqueue(this.chatSessionId, {
type: 'workOrderComplete',
args: [turnId, success, message],
timestamp: Date.now(),
});
this.log.debug("queued workOrderComplete message", { chatSessionId: this.chatSessionId });
}
}
@ -345,66 +382,110 @@ export class DroneSession extends SocketSession {
*/
async onWorkspaceModeChanged(mode: WorkspaceMode): Promise<void> {
if (!this.chatSessionId) {
this.log.warn("workspaceModeChanged event received but no chat session is active");
return;
}
this.workspaceMode = mode;
this.log.info("workspace mode changed", { mode });
const codeSession = SocketService.getCodeSessionByChatSessionId(
this.chatSessionId,
);
codeSession.onWorkspaceModeChanged(mode);
try {
const codeSession = SocketService.getCodeSessionByChatSessionId(
this.chatSessionId,
);
codeSession.onWorkspaceModeChanged(mode);
} catch (error) {
this.log.error("failed to route workspaceModeChanged event", { error });
}
}
async onAgentThinking(data: { agentId: string; thinking: string }): Promise<void> {
if (!this.chatSessionId) return;
if (!this.chatSessionId) {
this.log.warn("agent:thinking event received but no chat session is active");
return;
}
try {
const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId);
codeSession.socket.emit("agent:thinking", data);
} catch (error) {
this.log.error("failed to route agent:thinking", { error });
await MessageQueue.enqueue(this.chatSessionId, {
type: 'agent:thinking',
args: [data],
timestamp: Date.now(),
});
this.log.debug("queued agent:thinking message", { chatSessionId: this.chatSessionId });
}
}
async onAgentResponse(data: { agentId: string; chunk: string }): Promise<void> {
if (!this.chatSessionId) return;
if (!this.chatSessionId) {
this.log.warn("agent:response event received but no chat session is active");
return;
}
try {
const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId);
codeSession.socket.emit("agent:response", data);
} catch (error) {
this.log.error("failed to route agent:response", { error });
await MessageQueue.enqueue(this.chatSessionId, {
type: 'agent:response',
args: [data],
timestamp: Date.now(),
});
this.log.debug("queued agent:response message", { chatSessionId: this.chatSessionId });
}
}
async onAgentToolCall(data: { agentId: string; tool: string; args: unknown }): Promise<void> {
if (!this.chatSessionId) return;
if (!this.chatSessionId) {
this.log.warn("agent:tool-call event received but no chat session is active");
return;
}
try {
const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId);
codeSession.socket.emit("agent:tool-call", data);
} catch (error) {
this.log.error("failed to route agent:tool-call", { error });
await MessageQueue.enqueue(this.chatSessionId, {
type: 'agent:tool-call',
args: [data],
timestamp: Date.now(),
});
this.log.debug("queued agent:tool-call message", { chatSessionId: this.chatSessionId });
}
}
async onAgentToolResult(data: { agentId: string; tool: string; result: unknown }): Promise<void> {
if (!this.chatSessionId) return;
if (!this.chatSessionId) {
this.log.warn("agent:tool-result event received but no chat session is active");
return;
}
try {
const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId);
codeSession.socket.emit("agent:tool-result", data);
} catch (error) {
this.log.error("failed to route agent:tool-result", { error });
await MessageQueue.enqueue(this.chatSessionId, {
type: 'agent:tool-result',
args: [data],
timestamp: Date.now(),
});
this.log.debug("queued agent:tool-result message", { chatSessionId: this.chatSessionId });
}
}
async onAgentComplete(data: { agentId: string; response?: string; subagent?: Record<string, unknown>; stats?: Record<string, unknown> }): Promise<void> {
if (!this.chatSessionId) return;
if (!this.chatSessionId) {
this.log.warn("agent:complete event received but no chat session is active");
return;
}
try {
const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId);
codeSession.socket.emit("agent:complete", data);
} catch (error) {
this.log.error("failed to route agent:complete to frontend", { error });
await MessageQueue.enqueue(this.chatSessionId, {
type: 'agent:complete',
args: [data],
timestamp: Date.now(),
});
this.log.debug("queued agent:complete message", { chatSessionId: this.chatSessionId });
}
// Update the persisted tool call with the final response and subagent data
@ -524,4 +605,135 @@ export class DroneSession extends SocketSession {
cb(success);
});
}
/**
* Drains queued messages from Redis and delivers to reconnected CodeSession.
* Aggregates adjacent same-type streaming messages to reduce message count.
*/
async drainMessageQueue(): Promise<void> {
if (!this.chatSessionId || this.isDrainingQueue) return;
this.isDrainingQueue = true;
try {
const messages = await MessageQueue.drain(this.chatSessionId);
if (messages.length === 0) return;
this.log.info("draining message queue", { count: messages.length });
const codeSession = SocketService.getCodeSessionByChatSessionId(this.chatSessionId);
// Aggregate adjacent same-type streaming messages
const aggregated = this.aggregateMessages(messages);
for (const msg of aggregated) {
try {
switch (msg.type) {
case 'thinking':
codeSession.onThinking(msg.args[0] as string);
break;
case 'response':
codeSession.onResponse(msg.args[0] as string);
break;
case 'toolCall':
codeSession.onToolCall(
msg.args[0] as string,
msg.args[1] as string,
msg.args[2] as string,
msg.args[3] as string,
);
break;
case 'workOrderComplete':
codeSession.onWorkOrderComplete(
msg.args[0] as string,
msg.args[1] as boolean,
msg.args[2] as string,
);
break;
case 'log':
codeSession.onLog(
msg.args[0] as Date,
msg.args[1] as GadgetComponent,
msg.args[2] as GadgetLogLevel,
msg.args[3] as string,
msg.args[4] as unknown,
);
break;
case 'status':
codeSession.socket.emit("status", msg.args[0] as string);
break;
case 'agent:thinking':
codeSession.socket.emit("agent:thinking", msg.args[0] as { agentId: string; thinking: string });
break;
case 'agent:response':
codeSession.socket.emit("agent:response", msg.args[0] as { agentId: string; chunk: string });
break;
case 'agent:tool-call':
codeSession.socket.emit("agent:tool-call", msg.args[0] as { agentId: string; tool: string; args: unknown });
break;
case 'agent:tool-result':
codeSession.socket.emit("agent:tool-result", msg.args[0] as { agentId: string; tool: string; result: unknown });
break;
case 'agent:complete':
codeSession.socket.emit("agent:complete", msg.args[0] as { agentId: string; response?: string; subagent?: Record<string, unknown>; stats?: Record<string, unknown> });
break;
default:
this.log.warn("unknown queued message type", { type: (msg as any).type });
}
// Small delay to avoid flooding
await new Promise(resolve => setTimeout(resolve, 5));
} catch (error) {
this.log.error("failed to deliver queued message", {
type: msg.type,
error,
});
}
}
} catch (error) {
this.log.error("failed to drain message queue", { error });
} finally {
this.isDrainingQueue = false;
}
}
/**
* Aggregates adjacent same-type streaming messages (thinking/response).
* Preserves order and only aggregates during drain (not real-time).
*/
private aggregateMessages(messages: QueuedMessage[]): QueuedMessage[] {
const aggregated: QueuedMessage[] = [];
let currentAggregate: QueuedMessage | null = null;
for (const msg of messages) {
// Only aggregate thinking and response messages
if (msg.type === 'thinking' || msg.type === 'response') {
if (currentAggregate && currentAggregate.type === msg.type) {
// Continue aggregating same type
const currentContent = (currentAggregate.args[0] as string) || '';
const newContent = (msg.args[0] as string) || '';
currentAggregate.args = [currentContent + newContent];
} else {
// Type changed - push current aggregate and start new
if (currentAggregate) {
aggregated.push(currentAggregate);
}
currentAggregate = { ...msg };
}
} else {
// Non-aggregatable message type
if (currentAggregate) {
aggregated.push(currentAggregate);
currentAggregate = null;
}
aggregated.push(msg);
}
}
// Push final aggregate if exists
if (currentAggregate) {
aggregated.push(currentAggregate);
}
return aggregated;
}
}

View File

@ -0,0 +1,85 @@
// src/lib/message-queue.ts
// Copyright (C) 2026 Robert Colbert <rob.colbert@openplatform.us>
// All Rights Reserved
import redis from './redis.js';
import { GadgetLog } from '@gadget/api';
const log = new GadgetLog({ name: 'MessageQueue', slug: 'message-queue' });
export interface QueuedMessage {
type: 'thinking' | 'response' | 'toolCall' | 'workOrderComplete' |
'log' | 'status' | 'agent:thinking' | 'agent:response' |
'agent:tool-call' | 'agent:tool-result' | 'agent:complete';
args: unknown[];
timestamp: number;
}
const QUEUE_KEY = (chatSessionId: string) => `gadget:messages:${chatSessionId}`;
const TTL_SECONDS = 1800; // 30 minutes
const MAX_QUEUE_SIZE = 1000;
class MessageQueueService {
/**
* Enqueues a message to Redis for later delivery.
* Uses RPUSH for FIFO ordering (messages delivered in order).
* Trims queue to MAX_QUEUE_SIZE messages (drop oldest if exceeded).
*/
async enqueue(chatSessionId: string, message: QueuedMessage): Promise<void> {
const key = QUEUE_KEY(chatSessionId);
const serialized = JSON.stringify(message);
try {
// RPUSH for FIFO ordering
await redis.rpush(key, serialized);
// Trim to MAX_QUEUE_SIZE messages (drop oldest from left)
await redis.ltrim(key, -MAX_QUEUE_SIZE, -1);
// Set/refresh TTL
await redis.expire(key, TTL_SECONDS);
} catch (error) {
log.error('failed to enqueue message', { chatSessionId, error });
throw error; // Drone will stop on error
}
}
/**
* Drains all queued messages in FIFO order and deletes the queue.
* Returns messages for delivery to reconnected client.
*/
async drain(chatSessionId: string): Promise<QueuedMessage[]> {
const key = QUEUE_KEY(chatSessionId);
try {
// Get all messages
const messages = await redis.lrange(key, 0, -1);
if (messages.length > 0) {
// Delete the queue after draining
await redis.del(key);
return messages.map(msg => JSON.parse(msg) as QueuedMessage);
}
return [];
} catch (error) {
log.error('failed to drain message queue', { chatSessionId, error });
throw error;
}
}
/**
* Cleans up a message queue (e.g., on session close).
*/
async cleanup(chatSessionId: string): Promise<void> {
const key = QUEUE_KEY(chatSessionId);
try {
await redis.del(key);
} catch (error) {
log.error('failed to cleanup message queue', { chatSessionId, error });
}
}
}
export default new MessageQueueService();

View File

@ -0,0 +1,134 @@
// src/lib/tab-lock.ts
// Copyright (C) 2026 Robert Colbert <rob.colbert@openplatform.us>
// All Rights Reserved
import redis from './redis.js';
import { GadgetLog } from '@gadget/api';
const log = new GadgetLog({ name: 'TabLock', slug: 'tab-lock' });
const LOCK_KEY = (chatSessionId: string) => `gadget:lock:${chatSessionId}`;
const LOCK_TIMEOUT_MS = 60000; // 1 minute - must be renewed by heartbeat
export interface TabLockInfo {
socketId: string;
userId: string;
acquiredAt: number;
}
class TabLockService {
/**
* Attempts to acquire a tab lock for a chat session.
* Returns success=true if lock acquired, false if already locked by another tab.
*/
async acquire(
chatSessionId: string,
userId: string,
socketId: string
): Promise<{ success: boolean; info?: TabLockInfo }> {
const key = LOCK_KEY(chatSessionId);
const now = Date.now();
try {
// Try to set lock with NX (only if not exists)
const acquired = await redis.set(
key,
JSON.stringify({ socketId, userId, acquiredAt: now }),
'EX',
LOCK_TIMEOUT_MS / 1000,
'NX'
);
if (acquired === 'OK') {
log.info('tab lock acquired', { chatSessionId, userId, socketId });
return { success: true };
}
// Lock exists - check if it's ours or stale
const existing = await redis.get(key);
if (!existing) {
// Lock disappeared between check and set - retry
return this.acquire(chatSessionId, userId, socketId);
}
const info = JSON.parse(existing) as TabLockInfo;
// Check if lock is stale (expired but not cleaned up)
if (now - info.acquiredAt > LOCK_TIMEOUT_MS) {
log.info('stale tab lock detected, reclaiming', { chatSessionId, info });
await this.release(chatSessionId);
return this.acquire(chatSessionId, userId, socketId);
}
// Lock is held by another tab
log.info('tab lock denied - already locked', {
chatSessionId,
lockedBy: info.socketId,
ourSocket: socketId
});
return { success: false, info };
} catch (error) {
log.error('failed to acquire tab lock', { chatSessionId, error });
throw error;
}
}
/**
* Releases a tab lock.
*/
async release(chatSessionId: string, socketId?: string): Promise<void> {
const key = LOCK_KEY(chatSessionId);
try {
if (socketId) {
// Only release if we hold the lock
const existing = await redis.get(key);
if (existing) {
const info = JSON.parse(existing) as TabLockInfo;
if (info.socketId === socketId) {
await redis.del(key);
log.info('tab lock released', { chatSessionId, socketId });
}
}
} else {
await redis.del(key);
}
} catch (error) {
log.error('failed to release tab lock', { chatSessionId, error });
}
}
/**
* Refreshes the TTL on an existing lock (heartbeat).
*/
async refresh(chatSessionId: string): Promise<boolean> {
const key = LOCK_KEY(chatSessionId);
try {
const refreshed = await redis.expire(key, LOCK_TIMEOUT_MS / 1000);
return refreshed !== 0;
} catch (error) {
log.error('failed to refresh tab lock', { chatSessionId, error });
return false;
}
}
/**
* Gets current lock info without modifying.
*/
async getInfo(chatSessionId: string): Promise<TabLockInfo | null> {
const key = LOCK_KEY(chatSessionId);
try {
const existing = await redis.get(key);
if (!existing) return null;
return JSON.parse(existing) as TabLockInfo;
} catch (error) {
log.error('failed to get tab lock info', { chatSessionId, error });
return null;
}
}
}
export default new TabLockService();

View File

@ -292,6 +292,19 @@ class SocketService extends DtpService {
error.statusCode = 404;
throw error;
}
// CRITICAL: Check if socket is still connected
if (!session.socket.connected) {
this.log.warn("code session socket disconnected, clearing stale reference", {
chatSessionId,
socketId: session.socket.id,
});
this.chatSessionIndex.delete(chatSessionId);
const error = new Error("code session socket disconnected");
error.statusCode = 404;
throw error;
}
return session;
}

View File

@ -127,6 +127,7 @@ export interface ServerToClientEvents {
workOrderComplete: WorkOrderCompleteMessage;
workspaceModeChanged: WorkspaceModeChangedMessage;
sessionUpdated: SessionUpdatedMessage;
tabLockDenied: (data: { message: string }) => void;
"agent:thinking": AgentThinkingMessage;
"agent:response": AgentResponseMessage;
"agent:tool-call": AgentToolCallMessage;