Phase 3: Implement Drone→IDE event routing

- Add event handlers to DroneSession (thinking, response, toolCall, workOrderComplete)
- Implement routing logic to forward events to CodeSession
- Add chat session index to SocketService for reverse lookup
- Add workOrderComplete to ServerToClientEvents interface
- Update CodeSession to register chat session and set current turn on drone
- Add unit tests for DroneSession (12 tests, all passing)
This commit is contained in:
Rob Colbert 2026-04-29 16:26:10 -04:00
parent 8fe75b8c1c
commit 92d19a648c
6 changed files with 414 additions and 5 deletions

View File

@ -70,7 +70,7 @@
---
## Phase 3: Implement Event Routing (Drone→IDE)
## Phase 3: Implement Event Routing (Drone→IDE) ✅ COMPLETE
### 3.1 Add DroneSession Event Handlers
- **File:** `gadget-code/src/lib/drone-session.ts:21-23`
@ -79,7 +79,7 @@
- `response`
- `toolCall`
- `workOrderComplete`
- **Status:** ⬜ Pending
- **Status:** ✅ Complete
### 3.2 Implement Routing Logic
- **File:** `gadget-code/src/lib/drone-session.ts`
@ -87,12 +87,21 @@
- Find corresponding `CodeSession` by `chatSessionId`
- Forward event to IDE socket
- Update `ChatTurn` document with new data
- **Status:** ⬜ Pending
- **Status:** ✅ Complete
### 3.3 Add `getCodeSessionByChatSessionId()` to `SocketService`
- **File:** `gadget-code/src/services/socket.ts`
- **Action:** Maintain reverse index: `chatSessionId → CodeSession`
- **Status:** ⬜ Pending
- **Status:** ✅ Complete
### 3.4 Add `workOrderComplete` to ServerToClientEvents
- **File:** `packages/api/src/messages/socket.ts`
- **Status:** ✅ Complete
### 3.5 Unit Tests for DroneSession
- **File:** `gadget-code/tests/drone-session.test.ts`
- **Tests:** 12 tests covering event routing
- **Status:** ✅ Complete (all passing)
---

View File

@ -82,6 +82,8 @@ export class CodeSession extends SocketSession {
this.selectedDrone = registration;
this.chatSession = chatSession;
this.project = project;
SocketService.registerChatSession(chatSession._id.toHexString(), this);
droneSession.setChatSessionId(chatSession._id);
}
cb(success, chatSessionId);
},
@ -140,6 +142,8 @@ export class CodeSession extends SocketSession {
chatSessionId: this.chatSession._id.toHexString(),
});
droneSession.setCurrentTurnId(turn._id);
droneSession.socket.emit(
"processWorkOrder",
this.selectedDrone,

View File

@ -2,16 +2,21 @@
// Copyright (C) 2026 Robert Colbert <rob.colbert@openplatform.us>
// All Rights Reserved
import { IUser, IDroneRegistration } from "@gadget/api";
import { Types } from "mongoose";
import { IUser, IDroneRegistration, ChatTurnStatus } from "@gadget/api";
import {
GadgetSocket,
SocketSession,
SocketSessionType,
} from "./socket-session";
import SocketService from "../services/socket";
import { ChatTurn } from "../models/chat-turn";
export class DroneSession extends SocketSession {
protected type: SocketSessionType = SocketSessionType.Drone;
registration: IDroneRegistration;
chatSessionId: Types.ObjectId | undefined;
currentTurnId: Types.ObjectId | undefined;
constructor(socket: GadgetSocket, registration: IDroneRegistration) {
super(socket, registration.user as IUser);
@ -20,5 +25,146 @@ export class DroneSession extends SocketSession {
register() {
super.register();
this.socket.on("thinking", this.onThinking.bind(this));
this.socket.on("response", this.onResponse.bind(this));
this.socket.on("toolCall", this.onToolCall.bind(this));
this.socket.on("workOrderComplete", this.onWorkOrderComplete.bind(this));
}
/**
* Called when the drone emits thinking content from the agent.
*/
async onThinking(content: string): Promise<void> {
if (!this.chatSessionId) {
this.log.warn("thinking event received but no chat session is active");
return;
}
try {
const codeSession = SocketService.getCodeSessionByChatSessionId(
this.chatSessionId,
);
codeSession.socket.emit("thinking", content);
if (this.currentTurnId) {
await ChatTurn.findByIdAndUpdate(this.currentTurnId, {
thinking: content,
});
}
} catch (error) {
this.log.error("failed to route thinking event", { error });
}
}
/**
* Called when the drone emits response content from the agent.
*/
async onResponse(content: string): Promise<void> {
if (!this.chatSessionId) {
this.log.warn("response event received but no chat session is active");
return;
}
try {
const codeSession = SocketService.getCodeSessionByChatSessionId(
this.chatSessionId,
);
codeSession.socket.emit("response", content);
if (this.currentTurnId) {
await ChatTurn.findByIdAndUpdate(this.currentTurnId, {
response: content,
});
}
} catch (error) {
this.log.error("failed to route response event", { error });
}
}
/**
* Called when the drone emits a tool call event from the agent.
*/
async onToolCall(
callId: string,
name: string,
params: string,
response: string,
): Promise<void> {
if (!this.chatSessionId) {
this.log.warn("toolCall event received but no chat session is active");
return;
}
try {
const codeSession = SocketService.getCodeSessionByChatSessionId(
this.chatSessionId,
);
codeSession.socket.emit("toolCall", callId, name, params, response);
if (this.currentTurnId) {
const turn = await ChatTurn.findById(this.currentTurnId);
if (turn) {
turn.toolCalls.push({
callId,
name,
parameters: params,
response,
});
turn.stats.toolCallCount = turn.toolCalls.length;
await turn.save();
}
}
} catch (error) {
this.log.error("failed to route toolCall event", { error });
}
}
/**
* Called when the drone completes a work order.
*/
async onWorkOrderComplete(
turnId: string,
success: boolean,
message?: string,
): Promise<void> {
if (!this.chatSessionId) {
this.log.warn("workOrderComplete event received but no chat session is active");
return;
}
try {
const turn = await ChatTurn.findById(turnId);
if (turn) {
turn.status = success ? ChatTurnStatus.Finished : ChatTurnStatus.Error;
if (!success && message) {
turn.response = message;
}
await turn.save();
}
const codeSession = SocketService.getCodeSessionByChatSessionId(
this.chatSessionId,
);
codeSession.socket.emit("workOrderComplete", turnId, success, message);
this.currentTurnId = undefined;
} catch (error) {
this.log.error("failed to process workOrderComplete event", { error });
}
}
/**
* Sets the active chat session ID for this drone session.
*/
setChatSessionId(chatSessionId: Types.ObjectId): void {
this.chatSessionId = chatSessionId;
}
/**
* Sets the current turn ID being processed by this drone.
*/
setCurrentTurnId(turnId: Types.ObjectId): void {
this.currentTurnId = turnId;
}
}

View File

@ -27,10 +27,12 @@ import { DtpService } from "../lib/service.ts";
type CodeSessionMap = Map<string, CodeSession>;
type DroneSessionMap = Map<string, DroneSession>;
type ChatSessionCodeSessionMap = Map<string, CodeSession>;
class SocketService extends DtpService {
private codeSessions: CodeSessionMap = new Map<string, CodeSession>();
private droneSessions: DroneSessionMap = new Map<string, DroneSession>();
private chatSessionIndex: ChatSessionCodeSessionMap = new Map<string, CodeSession>();
private io?: Server<
ClientToServerEvents,
@ -220,6 +222,36 @@ class SocketService extends DtpService {
}
return session;
}
/**
* Registers a code session by its chat session ID for reverse lookup.
*/
registerChatSession(chatSessionId: string, codeSession: CodeSession): void {
this.chatSessionIndex.set(chatSessionId, codeSession);
}
/**
* Gets a code session by its chat session ID.
*/
getCodeSessionByChatSessionId(chatSessionId: Types.ObjectId | string): CodeSession {
const chatSessionIdStr = typeof chatSessionId === "string"
? chatSessionId
: chatSessionId.toHexString();
const session = this.chatSessionIndex.get(chatSessionIdStr);
if (!session) {
const error = new Error("code session not found for chat session");
error.statusCode = 404;
throw error;
}
return session;
}
/**
* Unregisters a code session by its chat session ID.
*/
unregisterChatSession(chatSessionId: string): void {
this.chatSessionIndex.delete(chatSessionId);
}
}
export default new SocketService();

View File

@ -0,0 +1,217 @@
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { Types } from 'mongoose';
import { DroneSession } from '../src/lib/drone-session';
import { IDroneRegistration, IUser, ChatTurnStatus } from '@gadget/api';
import SocketService from '../src/services/socket';
import { ChatTurn } from '../src/models/chat-turn';
// Mock dependencies
vi.mock('../src/services/socket');
vi.mock('../src/models/chat-turn');
describe('DroneSession', () => {
let mockSocket: any;
let mockRegistration: IDroneRegistration;
let mockUser: IUser;
let droneSession: DroneSession;
let mockChatSessionId: Types.ObjectId;
let mockTurnId: Types.ObjectId;
beforeEach(() => {
vi.clearAllMocks();
mockSocket = {
id: 'test-drone-socket',
on: vi.fn(),
emit: vi.fn(),
};
mockUser = {
_id: new Types.ObjectId(),
email: 'test@example.com',
displayName: 'Test User',
} as IUser;
mockRegistration = {
_id: new Types.ObjectId(),
user: mockUser,
hostname: 'test-host',
workspaceDir: '/test/workspace',
status: 'available',
} as IDroneRegistration;
mockChatSessionId = new Types.ObjectId();
mockTurnId = new Types.ObjectId();
droneSession = new DroneSession(mockSocket, mockRegistration);
});
describe('register', () => {
it('should register event handlers for drone events', () => {
droneSession.register();
expect(mockSocket.on).toHaveBeenCalledWith('thinking', expect.any(Function));
expect(mockSocket.on).toHaveBeenCalledWith('response', expect.any(Function));
expect(mockSocket.on).toHaveBeenCalledWith('toolCall', expect.any(Function));
expect(mockSocket.on).toHaveBeenCalledWith('workOrderComplete', expect.any(Function));
});
});
describe('setChatSessionId', () => {
it('should set the chat session ID', () => {
droneSession.setChatSessionId(mockChatSessionId);
expect(() => droneSession.setChatSessionId(mockChatSessionId)).not.toThrow();
});
});
describe('setCurrentTurnId', () => {
it('should set the current turn ID', () => {
droneSession.setCurrentTurnId(mockTurnId);
expect(() => droneSession.setCurrentTurnId(mockTurnId)).not.toThrow();
});
});
describe('onThinking', () => {
it('should route thinking event to code session and update ChatTurn', async () => {
const mockCodeSession = {
socket: { emit: vi.fn() },
};
vi.mocked(SocketService.getCodeSessionByChatSessionId).mockReturnValue(mockCodeSession as any);
vi.mocked(ChatTurn.findByIdAndUpdate).mockResolvedValue({} as any);
droneSession.setChatSessionId(mockChatSessionId);
droneSession.setCurrentTurnId(mockTurnId);
await droneSession.onThinking('thinking content');
expect(SocketService.getCodeSessionByChatSessionId).toHaveBeenCalledWith(mockChatSessionId);
expect(mockCodeSession.socket.emit).toHaveBeenCalledWith('thinking', 'thinking content');
expect(ChatTurn.findByIdAndUpdate).toHaveBeenCalledWith(mockTurnId, {
thinking: 'thinking content',
});
});
it('should log warning if no chat session is active', async () => {
await droneSession.onThinking('thinking content');
// No exception thrown, warning logged internally
expect(mockSocket.emit).not.toHaveBeenCalled();
});
});
describe('onResponse', () => {
it('should route response event to code session and update ChatTurn', async () => {
const mockCodeSession = {
socket: { emit: vi.fn() },
};
vi.mocked(SocketService.getCodeSessionByChatSessionId).mockReturnValue(mockCodeSession as any);
vi.mocked(ChatTurn.findByIdAndUpdate).mockResolvedValue({} as any);
droneSession.setChatSessionId(mockChatSessionId);
droneSession.setCurrentTurnId(mockTurnId);
await droneSession.onResponse('response content');
expect(SocketService.getCodeSessionByChatSessionId).toHaveBeenCalledWith(mockChatSessionId);
expect(mockCodeSession.socket.emit).toHaveBeenCalledWith('response', 'response content');
expect(ChatTurn.findByIdAndUpdate).toHaveBeenCalledWith(mockTurnId, {
response: 'response content',
});
});
it('should log warning if no chat session is active', async () => {
await droneSession.onResponse('response content');
// No exception thrown, warning logged internally
expect(mockSocket.emit).not.toHaveBeenCalled();
});
});
describe('onToolCall', () => {
it('should route toolCall event to code session and update ChatTurn', async () => {
const mockCodeSession = {
socket: { emit: vi.fn() },
};
const mockTurn = {
toolCalls: [],
stats: { toolCallCount: 0 },
save: vi.fn().mockResolvedValue(undefined),
};
vi.mocked(SocketService.getCodeSessionByChatSessionId).mockReturnValue(mockCodeSession as any);
vi.mocked(ChatTurn.findById).mockResolvedValue(mockTurn as any);
droneSession.setChatSessionId(mockChatSessionId);
droneSession.setCurrentTurnId(mockTurnId);
await droneSession.onToolCall('call-123', 'readFile', '{"path":"test.ts"}', 'file contents');
expect(SocketService.getCodeSessionByChatSessionId).toHaveBeenCalledWith(mockChatSessionId);
expect(mockCodeSession.socket.emit).toHaveBeenCalledWith('toolCall', 'call-123', 'readFile', '{"path":"test.ts"}', 'file contents');
expect(ChatTurn.findById).toHaveBeenCalledWith(mockTurnId);
expect(mockTurn.toolCalls).toHaveLength(1);
expect(mockTurn.toolCalls[0]).toEqual({
callId: 'call-123',
name: 'readFile',
parameters: '{"path":"test.ts"}',
response: 'file contents',
});
expect(mockTurn.stats.toolCallCount).toBe(1);
expect(mockTurn.save).toHaveBeenCalled();
});
it('should log warning if no chat session is active', async () => {
await droneSession.onToolCall('call-1', 'test', '{}', 'result');
// No exception thrown, warning logged internally
expect(mockSocket.emit).not.toHaveBeenCalled();
});
});
describe('onWorkOrderComplete', () => {
it('should update ChatTurn status and emit to code session on success', async () => {
const mockCodeSession = {
socket: { emit: vi.fn() },
};
const mockTurn = {
status: ChatTurnStatus.Processing,
save: vi.fn().mockResolvedValue(undefined),
};
vi.mocked(SocketService.getCodeSessionByChatSessionId).mockReturnValue(mockCodeSession as any);
vi.mocked(ChatTurn.findById).mockResolvedValue(mockTurn as any);
droneSession.setChatSessionId(mockChatSessionId);
await droneSession.onWorkOrderComplete(mockTurnId.toHexString(), true);
expect(ChatTurn.findById).toHaveBeenCalledWith(mockTurnId.toHexString());
expect(mockTurn.status).toBe(ChatTurnStatus.Finished);
expect(mockTurn.save).toHaveBeenCalled();
expect(mockCodeSession.socket.emit).toHaveBeenCalledWith('workOrderComplete', mockTurnId.toHexString(), true, undefined);
expect(droneSession.currentTurnId).toBeUndefined();
});
it('should update ChatTurn to Error status on failure', async () => {
const mockCodeSession = {
socket: { emit: vi.fn() },
};
const mockTurn = {
status: ChatTurnStatus.Processing,
response: '',
save: vi.fn().mockResolvedValue(undefined),
};
vi.mocked(SocketService.getCodeSessionByChatSessionId).mockReturnValue(mockCodeSession as any);
vi.mocked(ChatTurn.findById).mockResolvedValue(mockTurn as any);
droneSession.setChatSessionId(mockChatSessionId);
await droneSession.onWorkOrderComplete(mockTurnId.toHexString(), false, 'Agent crashed');
expect(mockTurn.status).toBe(ChatTurnStatus.Error);
expect(mockTurn.response).toBe('Agent crashed');
expect(mockTurn.save).toHaveBeenCalled();
});
it('should log warning if no chat session is active', async () => {
await droneSession.onWorkOrderComplete(mockTurnId.toHexString(), true);
// No exception thrown, warning logged internally
expect(mockSocket.emit).not.toHaveBeenCalled();
});
});
});

View File

@ -68,6 +68,7 @@ export interface ServerToClientEvents {
thinking: ThinkingMessage;
response: ResponseMessage;
toolCall: ToolCallMessage;
workOrderComplete: WorkOrderCompleteMessage;
}
export interface SocketData {