gadget/gadget-drone/src/gadget-drone.ts
Rob Colbert a4d25f90a9 Phase 4: Add event emissions to AWL
- Update AgentService.process() to accept socket parameter
- Emit thinking, response, and toolCall events during AWL execution
- Emit workOrderComplete when AWL loop finishes
- Update drone to pass socket to AgentService.process()
2026-04-29 16:28:53 -04:00

282 lines
7.0 KiB
TypeScript

// src/gadget-drone.ts
// Copyright (C) 2026 Rob Colbert <rob.colbert@openplatform.us>
// Licensed under the Apache License, Version 2.0
import env from "./config/env.ts";
import assert from "node:assert";
import { io, ManagerOptions, SocketOptions, Socket } from "socket.io-client";
import { input as inqInput, password as inqPassword } from "@inquirer/prompts";
import AgentService, { IAgentWorkOrder } from "./services/agent.ts";
import AiService from "./services/ai.ts";
import PlatformService, { PlatformRegistration } from "./services/platform.ts";
import { DroneStatus } from "@gadget/api";
import { GadgetProcess } from "./lib/process.ts";
import {
ClientToServerEvents,
IChatSession,
IChatTurn,
IDroneRegistration,
IProject,
ProcessWorkOrderCallback,
RequestSessionLockCallback,
RequestWorkspaceModeCallback,
ServerToClientEvents,
WorkspaceMode,
} from "@gadget/api";
interface UserCredentials {
email: string;
password: string;
}
type ClientSocket = Socket<ServerToClientEvents, ClientToServerEvents>;
class GadgetDrone extends GadgetProcess {
private registration: PlatformRegistration | undefined;
private workspaceMode: WorkspaceMode = WorkspaceMode.Syncing;
private socket: ClientSocket | undefined;
private isShuttingDown: boolean = false;
get name(): string {
return "GadgetDrone";
}
get slug(): string {
return "gadget-drone";
}
constructor() {
super();
}
async start(): Promise<void> {
/*
* Initialize the system
*/
this.hookProcessSignals();
await this.startServices();
/*
* Register this Drone with the Gadget Code web services platform.
*/
const credentials = await this.getUserCredentials();
const workspaceDir = process.cwd();
this.registration = await PlatformService.register(
credentials.email,
credentials.password,
workspaceDir,
);
this.log.info("registered with platform", {
registration: this.registration,
});
/*
* Connect to the Gadget Code web services platform and configure the real-
* time messaging system on Socket.IO.
*/
await this.connectSocket();
/*
* Mark this Drone as available and ready to accept work orders.
*/
await PlatformService.setStatus(DroneStatus.Available);
this.log.info(`Gadget Drone v${env.pkg.version} started`);
}
async stop(): Promise<number> {
this.log.info(`Gadget Drone v${env.pkg.version} shutting down`);
if (this.socket) {
this.socket.disconnect();
delete this.socket;
}
await PlatformService.unregister();
await this.stopServices();
return 0;
}
async startServices(): Promise<void> {
this.log.info("starting services");
await AgentService.start();
await AiService.start();
await PlatformService.start();
this.log.info("services started");
}
async stopServices(): Promise<void> {
this.log.info("stopping services");
await AgentService.stop();
await AiService.stop();
await PlatformService.stop();
this.log.info("services stopped");
}
async connectSocket(): Promise<void> {
return new Promise((resolve, reject) => {
assert(this.registration, "must be registered with Gadget Code platform");
const options: Partial<ManagerOptions & SocketOptions> = {
auth: { token: this.registration._id },
reconnectionAttempts: 10,
timeout: 5000,
transports: ["websocket"],
};
/*
* Allow self-signed certs in non-production environments
*/
if (env.NODE_ENV !== "production") {
options.rejectUnauthorized = false;
}
this.log.debug("connecting to Gadget Code platform...");
this.socket = io("https://code-dev.g4dge7.com:5174/", options);
this.socket.on("connect_error", (err) => {
this.log.error("socket connect error", { err });
reject(err);
});
this.socket.on("connect", () => {
this.log.info("connected to Gadget Code platform.");
resolve();
});
this.socket.on(
"requestSessionLock",
this.onRequestSessionLock.bind(this),
);
this.socket.on(
"requestWorkspaceMode",
this.onRequestWorkspaceMode.bind(this),
);
this.socket.on("processWorkOrder", this.onProcessWorkOrder.bind(this));
});
}
async onRequestSessionLock(
registration: IDroneRegistration,
project: IProject,
chatSession: IChatSession,
cb: RequestSessionLockCallback,
) {
this.log.info("requestSessionLock received", {
registration,
project,
chatSession,
});
if (!this.registration) {
return cb(false, "not registered");
}
if (!registration._id.equals(this.registration._id)) {
return cb(false, "invalid registration");
}
this.workspaceMode = WorkspaceMode.User;
cb(true, chatSession._id.toHexString());
}
async onRequestWorkspaceMode(
registration: IDroneRegistration,
project: IProject,
chatSession: IChatSession,
mode: WorkspaceMode,
cb: RequestWorkspaceModeCallback,
) {
this.log.info("requestWorkspaceMode received", {
registration,
project,
chatSession,
});
if (this.workspaceMode === WorkspaceMode.Idle) {
this.workspaceMode = mode;
return cb(true, this.workspaceMode);
}
return cb(false, this.workspaceMode);
}
async onProcessWorkOrder(
registration: IDroneRegistration,
project: IProject,
chatSession: IChatSession,
turn: IChatTurn,
cb: ProcessWorkOrderCallback,
) {
const order: IAgentWorkOrder = {
createdAt: turn.createdAt,
context: [],
turn,
};
this.log.info("processWorkOrder received", {
registration,
project,
chatSession,
turn,
});
cb(true); // the drone accepts the work order
if (!this.socket) {
this.log.error("cannot process work order: no socket connection");
return;
}
AgentService.process(order, this.socket);
}
hookProcessSignals(): void {
process.title = this.name;
process.on("unhandledRejection", async (error: Error, p) => {
this.log.error("Unhandled rejection", {
error,
promise: p,
stack: error.stack,
});
const exitCode = await this.stop();
process.exit(exitCode);
});
process.on("warning", (error) => {
if (error.name === "DeprecationWarning") return;
this.log.alert("warning", { error });
});
process.on("SIGINT", async () => {
this.log.info("SIGINT received");
if (this.isShuttingDown) return;
this.log.info("requesting shutdown");
const exitCode = await this.stop();
process.exit(exitCode);
});
}
async getUserCredentials(): Promise<UserCredentials> {
return {
email: await inqInput({ message: "📧 Enter Drone Email: " }),
password: await inqPassword({ message: "🔑 Enter Password: " }),
};
}
}
(async () => {
try {
const drone = new GadgetDrone();
await drone.start();
} catch (error) {
console.error("failed to start gadget-drone", error);
process.exit(-1);
}
})();