gadget/gadget-drone/src/gadget-drone.ts
Rob Colbert 4780b79148 feat: abort controller for work order processing
Add end-to-end abort support: AbortSignal in @gadget/ai providers,
abortWorkOrder socket message, drone AbortController handling,
Cancel button and double-Esc in frontend, and aborted turn status display.
2026-05-12 12:25:17 -04:00

741 lines
21 KiB
TypeScript

// src/gadget-drone.ts
// Copyright (C) 2026 Rob Colbert <rob.colbert@openplatform.us>
// Licensed under the Apache License, Version 2.0
import env from "./config/env.ts";
import assert from "node:assert";
import { io, ManagerOptions, SocketOptions, Socket } from "socket.io-client";
import { input as inqInput, password as inqPassword } from "@inquirer/prompts";
import AgentService, { IAgentWorkOrder } from "./services/agent.ts";
import AiService from "./services/ai.ts";
import PlatformService from "./services/platform.ts";
import WorkspaceService from "./services/workspace.ts";
import {
DroneStatus,
GadgetLog,
GadgetLogTransportSocket,
IUser,
} from "@gadget/api";
import { GadgetProcess } from "./lib/process.ts";
import {
ClientToServerEvents,
IChatSession,
IChatTurn,
IDroneRegistration,
IProject,
ProcessWorkOrderCallback,
RequestSessionLockCallback,
RequestWorkspaceModeCallback,
ServerToClientEvents,
WorkspaceMode,
} from "@gadget/api";
interface UserCredentials {
email: string;
password: string;
}
interface ISessionLock {
project: IProject;
session: IChatSession;
}
type ClientSocket = Socket<ServerToClientEvents, ClientToServerEvents>;
class GadgetDrone extends GadgetProcess {
private registration: IDroneRegistration | undefined;
private user: IUser | undefined;
private workspaceMode: WorkspaceMode = WorkspaceMode.Syncing;
private sessionLock: ISessionLock | undefined;
private isProcessingWorkOrder: boolean = false;
private socket: ClientSocket | undefined;
private isShuttingDown: boolean = false;
private heartbeatTimer: ReturnType<typeof setTimeout> | null = null;
get name(): string {
return "GadgetDrone";
}
get slug(): string {
return "gadget-drone";
}
constructor() {
super();
}
async start(): Promise<void> {
/*
* Initialize the system
*/
this.hookProcessSignals();
await this.startServices();
/*
* Initialize workspace directory structure and load/create workspace identity.
*/
const workspaceDir = process.cwd();
await WorkspaceService.initialize(workspaceDir);
this.log.info("workspace initialized", {
workspaceId: WorkspaceService.workspaceId,
workspaceDir,
});
/*
* Register this Drone with the Gadget Code web services platform.
* Include the workspace ID for crash recovery routing.
*/
const credentials = await this.getUserCredentials();
this.registration = await PlatformService.register(
credentials.email,
credentials.password,
workspaceDir,
WorkspaceService.workspaceId!,
);
this.user = this.registration.user as IUser;
this.log.info("registered with platform", {
registrationId: this.registration._id,
user: this.user.displayName,
});
// Update workspace with registration
WorkspaceService.updateRegistration({
_id: this.registration._id,
status: DroneStatus.Starting,
});
await WorkspaceService.writeWorkspaceData();
/*
* Connect to the Gadget Code web services platform and configure the real-
* time messaging system on Socket.IO.
*/
await this.connectSocket();
/*
* Check for incomplete work order (crash recovery)
*/
await this.checkCrashRecovery();
/*
* Mark this Drone as available and ready to accept work orders.
*/
await PlatformService.setStatus(DroneStatus.Available);
WorkspaceService.updateRegistration({
_id: this.registration._id,
status: DroneStatus.Available,
});
await WorkspaceService.writeWorkspaceData();
this.log.info(`Gadget Drone v${env.pkg.version} started`);
}
async stop(): Promise<number> {
this.log.info(`Gadget Drone v${env.pkg.version} shutting down`);
this.isShuttingDown = true;
if (this.heartbeatTimer) {
clearTimeout(this.heartbeatTimer);
this.heartbeatTimer = null;
}
if (this.socket) {
this.socket.disconnect();
delete this.socket;
}
try {
await PlatformService.unregister();
} catch (error) {
this.log.error("failed to unregister drone from Gadget Code platform", {
error,
});
// fall through
}
await this.stopServices();
return 0;
}
async startServices(): Promise<void> {
this.log.info("starting services");
await AgentService.start();
await AiService.start();
await PlatformService.start();
this.log.info("services started");
}
async stopServices(): Promise<void> {
this.log.info("stopping services");
await AgentService.stop();
await AiService.stop();
await PlatformService.stop();
this.log.info("services stopped");
}
async connectSocket(): Promise<void> {
return new Promise((resolve, reject) => {
assert(this.registration, "must be registered with Gadget Code platform");
const options: Partial<ManagerOptions & SocketOptions> = {
auth: { token: this.registration._id },
reconnectionAttempts: 10,
timeout: 5000,
transports: ["websocket"],
};
/*
* Allow self-signed certs in non-production environments
*/
if (env.NODE_ENV !== "production") {
options.rejectUnauthorized = false;
}
this.log.debug("connecting to Gadget Code platform...");
this.socket = io("https://code-dev.g4dge7.com:5174/", options);
this.socket.on("connect_error", (err) => {
this.log.error("socket connect error", { err });
reject(err);
});
this.socket.on("connect", () => {
this.log.info("connected to Gadget Code platform.");
const socketTransport = new GadgetLogTransportSocket(
(event, timestamp, component, level, message, metadata): void => {
(this.socket as any)?.emit(
event,
timestamp,
{ name: component.name, slug: component.slug },
level,
message,
metadata,
);
},
);
GadgetLog.addDefaultTransport(socketTransport);
this.log.transports.push(socketTransport);
AgentService.log.transports.push(socketTransport);
AiService.log.transports.push(socketTransport);
PlatformService.log.transports.push(socketTransport);
resolve();
});
this.socket.on(
"requestSessionLock",
this.onRequestSessionLock.bind(this),
);
this.socket.on(
"requestWorkspaceMode",
this.onRequestWorkspaceMode.bind(this),
);
this.socket.on("processWorkOrder", this.onProcessWorkOrder.bind(this));
this.socket.on(
"releaseSessionLock",
this.onReleaseSessionLock.bind(this),
);
this.socket.on("sessionHeartbeat", this.onSessionHeartbeat.bind(this));
this.socket.on(
"abortWorkOrder",
this.onAbortWorkOrder.bind(this),
);
this.socket.on(
"requestTermination",
this.onRequestTermination.bind(this),
);
/*
* Handle socket disconnect: clear the heartbeat timer to prevent
* spurious timeout firing while disconnected.
*/
this.socket.on("disconnect", (reason) => {
this.log.info("socket disconnected from platform", { reason });
if (this.heartbeatTimer) {
clearTimeout(this.heartbeatTimer);
this.heartbeatTimer = null;
}
});
/*
* Handle socket reconnect: re-emit current drone status so the
* platform knows the drone is still alive and available.
* NOTE: "reconnect" is a Manager-level event in Socket.IO v4,
* not a Socket-level event, so we listen on socket.io.
*/
this.socket.io.on("reconnect", (attemptNumber: number) => {
this.log.info("socket reconnected to platform", { attemptNumber });
if (this.sessionLock) {
this.socket?.emit("status", "session lock active (reconnected)");
} else {
this.socket?.emit("status", "available (reconnected)");
}
});
});
}
async onRequestSessionLock(
registration: IDroneRegistration,
project: IProject,
chatSession: IChatSession,
cb: RequestSessionLockCallback,
) {
assert(this.socket, "invalid application state");
/*
* Validate gadget-drone registration to ensure correct sync with IDE
*/
if (!this.registration) {
this.log.warn(
"received session lock request without a valid platform registration",
);
return cb(false, "not registered");
}
if (registration._id !== this.registration._id) {
this.log.warn(
"received session lock request for a different drone registration",
{
myId: this.registration._id,
requestId: registration._id,
},
);
return cb(false, "invalid registration");
}
/*
* Check if the chat session lock is already held.
*/
if (this.sessionLock) {
if (chatSession._id !== this.sessionLock.session._id) {
this.log.warn("rejecting session lock request", {
sessionLock: {
project: {
_id: this.sessionLock.project._id,
name: this.sessionLock.project.name,
},
session: {
_id: this.sessionLock.session._id,
name: this.sessionLock.session.name,
},
},
workspaceMode: this.workspaceMode,
});
return cb(false, this.sessionLock.session._id);
}
// fall through to grant this lock request
this.log.info("chat session is re-connecting to session lock");
}
/*
* Grant the chat session lock.
*/
this.log.info("granting session lock", {
registrationId: registration._id,
project: { _id: project._id, slug: project.slug },
chatSession: { _id: chatSession._id, name: chatSession.name },
});
this.sessionLock = {
project,
session: chatSession,
};
this.workspaceMode = WorkspaceMode.Idle;
this.socket.emit("status", "session lock granted");
/*
* Check if this project is already deployed (by ID, not slug).
*/
const haveProjectInWorkspace = WorkspaceService.hasProjectById(
project._id,
);
if (!haveProjectInWorkspace) {
this.socket.emit("status", `deploying project [slug=${project.slug}]`);
await WorkspaceService.deployProject(project);
}
/*
* Add/update the project in workspace data and lock to it.
*/
WorkspaceService.addProject({
_id: project._id,
slug: project.slug,
gitUrl: project.gitUrl,
});
if (!haveProjectInWorkspace) {
WorkspaceService.markProjectDeployed(project.slug);
}
WorkspaceService.updateLockedProject({
_id: project._id,
slug: project.slug,
gitUrl: project.gitUrl,
});
/*
* Commit the changes to the workspace.
*/
await WorkspaceService.writeWorkspaceData();
cb(true, chatSession._id);
this.socket.emit("status", "session lock granted");
}
async onReleaseSessionLock(
registration: IDroneRegistration,
project: IProject,
chatSession: IChatSession,
cb: (success: boolean) => void,
) {
if (!this.registration) {
return cb(false);
}
if (registration._id !== this.registration._id) {
return cb(false);
}
if (!this.sessionLock) {
this.log.info("releaseSessionLock: no lock held, nothing to release");
return cb(true);
}
if (chatSession._id !== this.sessionLock.session._id) {
this.log.warn("releaseSessionLock: session mismatch", {
requested: chatSession._id,
current: this.sessionLock.session._id,
});
}
this.log.info("releasing session lock", {
project: { _id: project._id, slug: project.slug },
chatSession: { _id: chatSession._id, name: chatSession.name },
});
// Clear the heartbeat timer to prevent spurious timeout after release
if (this.heartbeatTimer) {
clearTimeout(this.heartbeatTimer);
this.heartbeatTimer = null;
}
this.sessionLock = undefined;
this.workspaceMode = WorkspaceMode.Syncing;
this.socket?.emit("status", "session lock released");
cb(true);
}
async onSessionHeartbeat(cb: (ack: boolean) => void) {
if (this.heartbeatTimer) {
clearTimeout(this.heartbeatTimer);
}
this.heartbeatTimer = setTimeout(() => {
if (this.isShuttingDown) return;
this.log.warn("heartbeat timeout: releasing session lock");
this.sessionLock = undefined;
this.workspaceMode = WorkspaceMode.Syncing;
this.socket?.emit(
"status",
"session lock released due to heartbeat timeout",
);
this.heartbeatTimer = null;
}, 120000);
cb(true);
}
async onRequestWorkspaceMode(
registration: IDroneRegistration,
project: IProject,
chatSession: IChatSession,
mode: WorkspaceMode,
cb: RequestWorkspaceModeCallback,
) {
if (!this.sessionLock) {
return cb(false, this.workspaceMode);
}
if (chatSession._id !== this.sessionLock.session._id) {
this.log.warn("rejecting workspace mode request", {
chatSession: { _id: chatSession._id, name: chatSession.name },
});
return cb(
false,
this.workspaceMode,
`This drone is locked to a different session (${this.sessionLock.session.name})`,
);
}
this.log.info("requestWorkspaceMode received", {
registration: { _id: registration._id },
project: {
_id: project._id,
name: project.name,
},
chatSession: {
_id: chatSession._id,
name: chatSession.name,
},
currentMode: this.workspaceMode,
requestedMode: mode,
});
let newMode: WorkspaceMode | null = null;
let rejectionReason: string | undefined;
switch (this.workspaceMode) {
case WorkspaceMode.Idle:
if (mode === WorkspaceMode.User || mode === WorkspaceMode.Agent) {
newMode = mode;
} else {
rejectionReason = "Invalid mode transition from Idle";
}
break;
case WorkspaceMode.User:
if (mode === WorkspaceMode.Agent) {
newMode = mode;
} else {
rejectionReason = "Invalid mode transition from User mode";
}
break;
case WorkspaceMode.Agent:
if (mode === WorkspaceMode.User) {
if (this.isProcessingWorkOrder) {
rejectionReason =
"Agent is currently working. Please wait for the current task to complete.";
} else {
newMode = mode;
}
} else {
rejectionReason = "Invalid mode transition from Agent mode";
}
break;
case WorkspaceMode.Syncing:
rejectionReason = "Cannot change mode during sync operation";
break;
}
if (newMode) {
this.workspaceMode = newMode;
this.socket!.emit("workspaceModeChanged", this.workspaceMode);
this.log.info("workspace mode changed", {
previousMode: this.workspaceMode,
newMode,
});
return cb(true, this.workspaceMode);
} else {
this.log.info("workspace mode request rejected", {
reason: rejectionReason,
});
return cb(false, this.workspaceMode, rejectionReason);
}
}
async onProcessWorkOrder(
registration: IDroneRegistration,
turn: IChatTurn,
cb: ProcessWorkOrderCallback,
) {
const project = turn.project as IProject;
const session = turn.session as IChatSession;
if (!this.sessionLock) {
return cb(false, "this drone is not locked to a chat session");
}
if (this.sessionLock.session._id !== session._id) {
return cb(false, "this drone is not locked to your chat session");
}
if (this.sessionLock.project._id !== project._id) {
return cb(
false,
`this drone is locked to a different project: ${this.sessionLock.project.name}`,
);
}
if (this.workspaceMode !== WorkspaceMode.Agent) {
return cb(false, "this drone's workspace is not in Agent mode");
}
if (!this.socket) {
this.log.error("cannot process work order: no socket connection");
cb(false, "No socket connection");
return;
}
const context = await PlatformService.getChatSessionContext(session);
const order: IAgentWorkOrder = {
createdAt: turn.createdAt,
context: context.data,
turn,
};
this.log.info("processWorkOrder received", {
registration: { _id: registration._id },
project: { _id: project._id, name: project.name },
session: { _id: session._id, name: session.name },
turn: { _id: turn._id, mode: turn.mode, userPrompt: turn.prompts.user },
});
// Write work order cache BEFORE processing (for crash recovery)
try {
await WorkspaceService.writeWorkOrderCache(turn);
} catch (error) {
const err = error as Error;
this.log.error("failed to write work order cache", {
error: err.message,
});
// Continue anyway - cache is for recovery, not required
}
cb(true, "work order accepted"); // confirm that drone has the work order
const workspaceDir =
WorkspaceService.workspaceData?.workspaceDir || process.cwd();
const projectDir = WorkspaceService.getProjectDirectory(project.slug);
process.chdir(projectDir);
this.isProcessingWorkOrder = true;
try {
this.socket.emit("status", "processing work order");
await AgentService.process(order, this.socket);
this.socket.emit("status", "work order processing finished");
await WorkspaceService.removeWorkOrderCache();
} catch (error) {
const err = error as Error;
this.log.error("work order processing failed", {
error: err.message,
});
this.socket.emit(
"status",
`failed to process work order: ${(error as Error).message}`,
);
await WorkspaceService.removeWorkOrderCache();
} finally {
process.chdir(workspaceDir);
this.isProcessingWorkOrder = false;
this.log.info("work order processing complete", {
isProcessingWorkOrder: this.isProcessingWorkOrder,
});
}
}
hookProcessSignals(): void {
process.title = this.name;
process.on("unhandledRejection", async (error: Error, p) => {
this.log.error("Unhandled rejection", {
error,
promise: p,
stack: error.stack,
});
const exitCode = await this.stop();
process.exit(exitCode);
});
process.on("warning", (error) => {
if (error.name === "DeprecationWarning") return;
this.log.alert("warning", { error });
});
process.on("SIGINT", async () => {
this.log.info("SIGINT received");
if (this.isShuttingDown) return;
this.log.info("requesting shutdown");
const exitCode = await this.stop();
process.exit(exitCode);
});
}
async getUserCredentials(): Promise<UserCredentials> {
const args = process.argv.slice(2);
const userArg = args.find((a) => a.startsWith("--user="));
const passArg = args.find((a) => a.startsWith("--password="));
if (userArg && passArg) {
return {
email: userArg.split("=")[1],
password: passArg.split("=")[1],
};
}
return {
email: await inqInput({ message: "📧 Enter Drone Email: " }),
password: await inqPassword({ message: "🔑 Enter Password: " }),
};
}
/**
* Checks for incomplete work order and initiates crash recovery if needed.
*/
async checkCrashRecovery(): Promise<void> {
const cache = await WorkspaceService.readWorkOrderCache();
if (!cache) {
this.log.debug("no incomplete work order found");
return;
}
const chatSession = cache.turn.session as IChatSession;
this.log.warn("incomplete work order found - initiating crash recovery", {
turnId: cache.turn._id,
chatSessionId: chatSession._id,
});
if (!this.socket) {
this.log.error("cannot initiate crash recovery: no socket connection");
return;
}
// Notify web service that this workspace has pending recovery
this.socket.emit("requestCrashRecovery", {
workspaceId: WorkspaceService.workspaceId!,
turnId: cache.turn._id,
chatSessionId: chatSession._id,
});
}
async onAbortWorkOrder(cb: (success: boolean, message?: string) => void): Promise<void> {
this.log.info("abortWorkOrder received from platform", {
registrationId: this.registration?._id,
isProcessing: this.isProcessingWorkOrder,
});
const aborted = AgentService.abortCurrentWorkOrder();
cb(aborted, aborted ? "Abort signaled" : "No active work order to abort");
}
async onRequestTermination(cb: (success: boolean) => void): Promise<void> {
this.log.info("requestTermination received from platform", {
registrationId: this.registration?._id,
});
cb(true);
process.kill(process.pid, "SIGINT");
}
}
(async () => {
try {
const drone = new GadgetDrone();
await drone.start();
} catch (error) {
console.error("failed to start gadget-drone", error);
process.exit(-1);
}
})();