switched to feature/socket-protocol to continue experiments

This commit is contained in:
Rob Colbert 2026-04-29 10:11:01 -04:00
parent db0d1586d6
commit 096d8fe8b3
13 changed files with 399 additions and 248 deletions

View File

@ -2,33 +2,60 @@
// Copyright (C) 2026 Robert Colbert <rob.colbert@openplatform.us>
// All Rights Reserved
import { Socket } from "socket.io";
import { SocketSession, SocketSessionType } from "./socket-session";
import { IUser } from "@gadget/api";
import {
GadgetSocket,
SocketSession,
SocketSessionType,
} from "./socket-session";
import { IChatSession, IDroneRegistration, IProject, IUser } from "@gadget/api";
import SocketService from "../services/socket.ts";
export class CodeSession extends SocketSession {
protected type: SocketSessionType = SocketSessionType.Code;
constructor(socket: Socket, user: IUser) {
protected project: IProject | undefined;
protected chatSession: IChatSession | undefined;
constructor(socket: GadgetSocket, user: IUser) {
super(socket, user);
}
register() {
super.register();
this.socket.on("thinking", this.onThinking.bind(this));
this.socket.on("response", this.onResponse.bind(this));
this.socket.on("tool-call", this.onToolCall.bind(this));
this.socket.on("requestSessionLock", this.onRequestSessionLock.bind(this));
this.socket.on("submitPrompt", this.onSubmitPrompt.bind(this));
}
async onThinking(): Promise<void> {}
/**
* Called when the IDE sends a requestSessionLock event to lock a gadget-drone
* instance to this code session.
* @param registration the gadget-drone registration to which the request will
* be sent.
* @param project the project we're locking the drone to
* @param chatSession the chat session we're locking the drone to
* @param cb response callback to call with the result of the request
*/
onRequestSessionLock(
registration: IDroneRegistration,
project: IProject,
chatSession: IChatSession,
cb: (success: boolean, chatSessionId: string) => void,
) {
const droneSession = SocketService.getDroneSession(registration);
droneSession.socket.emit(
"requestSessionLock",
registration,
project,
chatSession,
(success: boolean, chatSessionId: string): void => {
cb(success, chatSessionId);
},
);
}
async onResponse(): Promise<void> {}
async onToolCall(): Promise<void> {
this.log.info("tool call received", {
params: { thing: 1 },
response: "Woooo!",
});
async onSubmitPrompt(content: string): Promise<void> {
this.log.debug("prompt received", { content });
}
}

View File

@ -3,14 +3,17 @@
// All Rights Reserved
import { IUser, IDroneRegistration } from "@gadget/api";
import { SocketSession, SocketSessionType } from "./socket-session";
import { Socket } from "socket.io";
import {
GadgetSocket,
SocketSession,
SocketSessionType,
} from "./socket-session";
export class DroneSession extends SocketSession {
protected type: SocketSessionType = SocketSessionType.Drone;
registration: IDroneRegistration;
constructor(socket: Socket, registration: IDroneRegistration) {
constructor(socket: GadgetSocket, registration: IDroneRegistration) {
super(socket, registration.user as IUser);
this.registration = registration;
}

View File

@ -3,7 +3,12 @@
// All Rights Reserved
import { Socket } from "socket.io";
import { IUser } from "@gadget/api";
import {
ClientToServerEvents,
IUser,
ServerToClientEvents,
SocketData,
} from "@gadget/api";
import { DtpLog } from "./log";
export enum SocketSessionType {
@ -11,23 +16,33 @@ export enum SocketSessionType {
Drone = "drone",
}
export type GadgetSocket = Socket<
ClientToServerEvents,
ServerToClientEvents,
never,
SocketData
>;
export abstract class SocketSession {
protected log: DtpLog;
protected socket: Socket;
protected _socket: GadgetSocket;
protected _user: IUser;
get user() {
public get socket() {
return this._socket;
}
public get user() {
return this._user;
}
protected abstract type: SocketSessionType;
constructor(socket: Socket, user: IUser) {
constructor(socket: GadgetSocket, user: IUser) {
this.log = new DtpLog({
name: "SocketSession",
slug: "lib:socket-session",
});
this.socket = socket;
this._socket = socket;
this._user = user;
}

View File

@ -0,0 +1,30 @@
// src/models/ide-session.ts
// Copyright (C) 2026 Robert Colbert <rob.colbert@openplatform.us>
// All Rights Reserved
import { Schema, Types, model } from "mongoose";
import { DtpLog } from "../lib/log.js";
import { IIdeSession } from "@gadget/api";
const log = new DtpLog({
name: "IdeSessionModel",
slug: "model:ide-session",
});
const IdeSessionSchema = new Schema<IIdeSession>({
createdAt: { type: Date, default: Date.now, required: true },
user: { type: Types.ObjectId, required: true, ref: "User" },
});
IdeSessionSchema.index({
user: 1,
createdAt: -1,
});
export const IdeSession = model<IIdeSession>("IdeSession", IdeSessionSchema);
export default IdeSession;
(async () => {
log.info("Syncing indexes...");
await IdeSession.syncIndexes();
})();

View File

@ -45,6 +45,7 @@ class ContactService extends DtpService {
}
async start(): Promise<void> {
this.log.info("starting");
this.log.info("creating SMTP transport", {
host: env.email.smtp.host,
port: env.email.smtp.port,
@ -70,7 +71,9 @@ class ContactService extends DtpService {
];
}
async stop(): Promise<void> {}
async stop(): Promise<void> {
this.log.info("stopped");
}
async sendEmail(message: EmailMessage): Promise<IEmailLog> {
if (!this.transport) {

View File

@ -4,15 +4,10 @@
import { PopulateOptions, Types } from "mongoose";
import {
IUser,
DroneStatus,
IDroneRegistration,
IChatSession,
} from "@gadget/api";
import DroneRegistration from "@/models/drone-registration.js";
import { IUser, DroneStatus, IDroneRegistration } from "@gadget/api";
import DroneRegistration from "../models/drone-registration.ts";
import { DtpService } from "../lib/service.js";
import { DtpService } from "../lib/service.ts";
export interface IDroneDefinition {
hostname: string;
@ -146,31 +141,6 @@ class DroneService extends DtpService {
}
return newRegistration;
}
async requestChatSessionLock(
registration: IDroneRegistration,
session: IChatSession,
): Promise<IDroneRegistration> {
/*
* TODO: Send socket message to drone requesting session lock
* If drone acknowledges lock, update the registration with the chatSessionId.
* If the drone denies the lock, throw a descriptive error.
*/
// Update the registration with the chatSessionId
const updatedRegistration = await DroneRegistration.findOneAndUpdate(
{ _id: registration._id },
{ $set: { chatSessionId: session._id } },
{ new: true, populate: this.populateDroneRegistration },
);
if (!updatedRegistration) {
const error = new Error("drone registration has been removed");
error.statusCode = 404;
throw error;
}
return updatedRegistration;
}
}
export default new DroneService();

View File

@ -0,0 +1,222 @@
// src/services/socket.ts
// Copyright (C) 2026 Robert Colbert <rob.colbert@openplatform.us>
// All Rights Reserved
import http from "node:http";
import { Types } from "mongoose";
import { DisconnectReason, ExtendedError, Socket, Server } from "socket.io";
import { GadgetSocket, SocketSessionType } from "../lib/socket-session.js";
import { CodeSession } from "../lib/code-session.js";
import { DroneSession } from "../lib/drone-session.js";
import {
ClientToServerEvents,
IDroneRegistration,
IIdeSession,
ServerToClientEvents,
SocketData,
} from "@gadget/api";
import DroneService from "./drone.js";
import SessionService from "./session.js";
import { DtpService } from "../lib/service.js";
type CodeSessionMap = Map<string, CodeSession>;
type DroneSessionMap = Map<string, DroneSession>;
class SocketService extends DtpService {
private codeSessions: CodeSessionMap = new Map<string, CodeSession>();
private droneSessions: DroneSessionMap = new Map<string, DroneSession>();
private io?: Server<
ClientToServerEvents,
ServerToClientEvents,
never,
SocketData
>;
get name(): string {
return "SocketService";
}
get slug(): string {
return "svc:socket";
}
async start(): Promise<void> {
this.log.info("started");
}
async stop(): Promise<void> {
this.log.info("stopped");
}
async listen(httpServer: http.Server): Promise<void> {
/*
* Create Socket.io server
*/
this.io = new Server<
ClientToServerEvents,
ServerToClientEvents,
never,
SocketData
>(httpServer, {
cors: {
origin: "*",
methods: ["GET", "POST"],
},
});
this.io.use(this.onSocketAuth.bind(this));
this.io.on("connection", this.onSocketConnection.bind(this));
this.log.info("socket.io server initialized");
}
async onSocketAuth(
socket: GadgetSocket,
next: (err?: ExtendedError) => void,
) {
const token = socket.handshake.auth.token;
// this.log.debug("received socket authentication request", { token });
if (!token) {
this.log.warn("socket connection rejected: no token provided");
return next(new Error("Authentication required"));
}
/*
* Try first to validate as a User JWT session
*/
try {
const user = await SessionService.verifyJsonWebToken(token);
const session: CodeSession = new CodeSession(socket, user);
this.codeSessions.set(socket.id, session);
socket.data = { sessionType: SocketSessionType.Code };
socket.on("disconnect", (reason: DisconnectReason, extra?: unknown) => {
this.onSocketDisconnect(socket, reason, extra);
});
return next();
} catch (cause) {
const error = cause as Error;
if (error.name !== "TokenVerifyError") {
this.log.warn("socket connection rejected: invalid token", {
error,
});
return next(new Error("Invalid authentication token"));
}
// fall through to next test
}
/*
* If not a User JWT, try to validate as a Drone session
*/
try {
const registrationId = Types.ObjectId.createFromHexString(token);
const registration = await DroneService.getById(registrationId);
const droneSession: DroneSession = new DroneSession(socket, registration);
this.droneSessions.set(socket.id, droneSession);
socket.data = { sessionType: SocketSessionType.Drone };
socket.on("disconnect", (reason: DisconnectReason, extra?: unknown) => {
this.onSocketDisconnect(socket, reason, extra);
});
return next();
} catch (error) {
this.log.warn("socket connection rejected: invalid auth token", {
error,
});
next(new Error("Invalid authentication token"));
}
}
onSocketConnection(socket: Socket) {
switch (socket.data.sessionType) {
case SocketSessionType.Code:
return this.onSocketConnectCode(socket);
case SocketSessionType.Drone:
return this.onSocketConnectDrone(socket);
default:
break;
}
this.log.error("invalid socket session type during connect");
}
onSocketConnectCode(socket: Socket) {
const session = this.codeSessions.get(socket.id);
if (!session) {
this.log.warn("invalid code session during connect");
socket.disconnect(true);
return;
}
this.log.info("code socket connected", {
id: socket.id,
userId: session.user._id.toHexString(),
});
}
onSocketConnectDrone(socket: Socket) {
const session = this.droneSessions.get(socket.id);
if (!session) {
this.log.warn("invalid drone session during connect");
socket.disconnect(true);
return;
}
this.log.info("drone socket connected", {
id: socket.id,
registrationId: session.registration._id.toHexString(),
});
}
onSocketDisconnect(
socket: Socket,
reason: DisconnectReason,
extra?: unknown,
) {
this.log.info("socket disconnect", { reason, extra });
switch (socket.data.sessionType) {
case SocketSessionType.Code:
this.log.info("closing code socket session", { id: socket.id });
this.codeSessions.delete(socket.id);
return;
case SocketSessionType.Drone:
this.log.info("closing drone socket session", { id: socket.id });
this.droneSessions.delete(socket.id);
return;
default:
break;
}
this.log.error("invalid session type in socket disconnect", {
id: socket.id,
data: socket.data,
});
}
getCodeSession(ideSession: IIdeSession): CodeSession {
const session = this.codeSessions.get(ideSession._id.toHexString());
if (!session) {
const error = new Error("code session not found");
error.statusCode = 404;
throw error;
}
return session;
}
getDroneSession(registration: IDroneRegistration): DroneSession {
const session = this.droneSessions.get(registration._id.toHexString());
if (!session) {
const error = new Error("drone session not found");
error.statusCode = 404;
throw error;
}
return session;
}
}
export default new SocketService();

View File

@ -8,13 +8,6 @@ import assert from "node:assert";
import path from "node:path";
import http from "node:http";
import {
DisconnectReason,
ExtendedError,
Socket,
Server as SocketIOServer,
} from "socket.io";
import "./lib/db.js";
import redis from "./lib/redis.js";
@ -48,37 +41,18 @@ import { UserController } from "./controllers/user.js";
import ApiClient from "./services/api-client.js";
import ContactService from "./services/contact.js";
import DroneService from "./services/drone.js";
import SocketService from "./services/socket.js";
import SessionService, { SessionType } from "./services/session.js";
import StorageService from "./services/storage.js";
import { Types } from "mongoose";
import { User } from "./models/user.js";
import { SocketSessionType } from "./lib/socket-session.js";
import { CodeSession } from "./lib/code-session.js";
import { DroneSession } from "./lib/drone-session.js";
import {
ClientToServerEvents,
ServerToClientEvents,
SocketData,
} from "@gadget/api";
class DtpWebAppServer implements DtpComponent {
private log: DtpLog;
private app?: express.Application;
private server?: http.Server;
public io?: SocketIOServer;
private codeSessions: Map<string, CodeSession> = new Map<
string,
CodeSession
>();
private droneSessions: Map<string, DroneSession> = new Map<
string,
DroneSession
>();
get name(): string {
return "DtpWebAppServer";
@ -111,6 +85,7 @@ class DtpWebAppServer implements DtpComponent {
async startServices(): Promise<void> {
await ApiClient.start();
await ContactService.start();
await SocketService.start();
await StorageService.start();
}
@ -262,7 +237,7 @@ class DtpWebAppServer implements DtpComponent {
}
async startHttpServer(): Promise<void> {
return new Promise<void>((resolve) => {
return new Promise<void>(async (resolve) => {
assert(this.app, "ExpressJS app instance is required");
this.log.info("starting HTTP server", {
address: env.https.address,
@ -275,22 +250,9 @@ class DtpWebAppServer implements DtpComponent {
this.server = http.createServer(this.app);
/*
* Create Socket.io server
* Start the Socket.IO service
*/
this.io = new SocketIOServer<
ClientToServerEvents,
ServerToClientEvents,
never,
SocketData
>(this.server, {
cors: {
origin: "*",
methods: ["GET", "POST"],
},
});
this.io.use(this.onSocketAuth.bind(this));
this.io.on("connection", this.onSocketConnection.bind(this));
this.log.info("socket.io server initialized");
await SocketService.listen(this.server);
/*
* Start HTTP server
@ -307,134 +269,6 @@ class DtpWebAppServer implements DtpComponent {
});
}
async onSocketAuth(socket: Socket, next: (err?: ExtendedError) => void) {
const token = socket.handshake.auth.token;
this.log.debug("received socket authentication request", { token });
if (!token) {
this.log.warn("socket connection rejected: no token provided");
return next(new Error("Authentication required"));
}
/*
* Try first to validate as a User JWT session
*/
try {
const user = await SessionService.verifyJsonWebToken(token);
this.log.info("socket authenticated as User");
const session: CodeSession = new CodeSession(socket, user);
this.codeSessions.set(socket.id, session);
socket.data = { sessionType: SocketSessionType.Code };
socket.on("disconnect", (reason: DisconnectReason, extra?: unknown) => {
this.onSocketDisconnect(socket, reason, extra);
});
return next();
} catch (cause) {
const error = cause as Error;
if (error.name !== "TokenVerifyError") {
this.log.warn("socket connection rejected: invalid token", {
error,
});
return next(new Error("Invalid authentication token"));
}
// fall through to next test
}
/*
* If not a User JWT, try to validate as a Drone session
*/
try {
const registrationId = Types.ObjectId.createFromHexString(token);
const registration = await DroneService.getById(registrationId);
this.log.info("socket authenticated as Drone");
const droneSession: DroneSession = new DroneSession(socket, registration);
this.droneSessions.set(socket.id, droneSession);
socket.data = { sessionType: SocketSessionType.Drone };
socket.on("disconnect", (reason: DisconnectReason, extra?: unknown) => {
this.onSocketDisconnect(socket, reason, extra);
});
return next();
} catch (error) {
this.log.warn("socket connection rejected: invalid auth token", {
error,
});
next(new Error("Invalid authentication token"));
}
}
onSocketConnection(socket: Socket) {
switch (socket.data.sessionType) {
case SocketSessionType.Code:
return this.onSocketConnectCode(socket);
case SocketSessionType.Drone:
return this.onSocketConnectDrone(socket);
default:
break;
}
this.log.error("invalid socket session type during connect");
}
onSocketDisconnect(
socket: Socket,
reason: DisconnectReason,
extra?: unknown,
) {
this.log.info("socket disconnect", { reason, extra });
switch (socket.data.sessionType) {
case SocketSessionType.Code:
this.log.info("closing code socket session", { id: socket.id });
this.codeSessions.delete(socket.id);
return;
case SocketSessionType.Drone:
this.log.info("closing drone socket session", { id: socket.id });
this.droneSessions.delete(socket.id);
return;
default:
break;
}
this.log.error("invalid session type in socket disconnect", {
id: socket.id,
data: socket.data,
});
}
onSocketConnectCode(socket: Socket) {
const session = this.codeSessions.get(socket.id);
if (!session) {
this.log.warn("invalid code session during connect");
socket.disconnect(true);
return;
}
this.log.info("code socket connected", {
id: socket.id,
userId: session.user._id.toHexString(),
email: session.user.email,
});
}
onSocketConnectDrone(socket: Socket) {
const session = this.droneSessions.get(socket.id);
if (!session) {
this.log.warn("invalid drone session during connect");
socket.disconnect(true);
return;
}
this.log.info("drone socket connected", {
id: socket.id,
userId: session.user._id.toHexString(),
email: session.user.email,
registration: session.registration,
});
}
async stopHttpServer(): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!this.server) {
@ -498,13 +332,9 @@ class DtpWebAppServer implements DtpComponent {
}
if (token) {
this.log.debug("restoring session from JWT", { token });
req.user = await SessionService.verifyJsonWebToken(token);
} else {
const userId = Types.ObjectId.createFromHexString(req.session.user._id);
this.log.debug("restoring session from HTTP session", {
sessionId: req.session.id,
});
req.user = await User.findOne({ _id: userId });
}

View File

@ -7,12 +7,13 @@
*/
export * from "./interfaces/ai-provider.ts";
export * from "./interfaces/user.ts";
export * from "./interfaces/project.ts";
export * from "./interfaces/drone-registration.ts";
export * from "./interfaces/drone-monitor.ts";
export * from "./interfaces/chat-session.ts";
export * from "./interfaces/chat-turn.ts";
export * from "./interfaces/drone-monitor.ts";
export * from "./interfaces/drone-registration.ts";
export * from "./interfaces/ide-session.ts";
export * from "./interfaces/project.ts";
export * from "./interfaces/user.ts";
/*
* Socket.IO Interfaces

View File

@ -0,0 +1,19 @@
// src/interfaces/ide-session.ts
// Copyright (C) 2026 Rob Colbert <rob.colbert@openplatform.us>
// Licensed under the Apache License, Version 2.0
import { Document, Types } from "mongoose";
import type { IUser } from "./user.js";
import type { IProject } from "./project.js";
/**
* When the User logs into the IDE it creates a session against which Socket.IO
* events are scoped.
*/
export interface IIdeSession extends Document {
_id: Types.ObjectId;
createdAt: Date;
user: IUser | Types.ObjectId;
project: IProject | Types.ObjectId;
name: string;
}

View File

@ -1,5 +1,11 @@
// src/messages/drone.ts
// Copyright (C) 2026 Rob Colbert <rob.colbert@openplatform.us>
// Licensed under the Apache License, Version 2.0
export type ThinkingMessage = (content: string) => void;
export type ResponseMessage = (content: string) => void;
export type ToolCallMessage = (
name: string,
params: string,

View File

@ -1,9 +1,16 @@
// src/messages/ide.ts
// Copyright (C) 2026 Rob Colbert <rob.colbert@openplatform.us>
// Licensed under the Apache License, Version 2.0
import { IChatSession } from "../interfaces/chat-session.ts";
import { IDroneRegistration } from "../interfaces/drone-registration.ts";
import { IProject } from "../interfaces/project.ts";
export type RequestSessionLockMessage = (
registration: IDroneRegistration,
project: IProject,
chatSession: IChatSession,
cb: (success: boolean, chatSessionId: string) => void,
) => void;
export type SubmitPromptMessage = (prompt: string) => void;

View File

@ -1,37 +1,53 @@
// src/messages/gadget-code.ts
// src/messages/socket.ts
// Copyright (C) 2026 Rob Colbert <rob.colbert@openplatform.us>
// Licensed under the Apache License, Version 2.0
import { ResponseMessage, ThinkingMessage, ToolCallMessage } from "./drone.ts";
import { RequestSessionLockMessage, SubmitPromptMessage } from "./ide.ts";
export interface ServerToClientEvents {
/*
* GadgetCode => IDE
There are two different kinds of clients that connect to the gadget-code
Socket.IO server:
1. The gadget-code:ide (ReactJS front-end)
2. The gadget-drone work order runner (NodeJS headless process)
gadget-code:ide sends Socket.IO messages to gadget-code:web, which then routes
them to the appropriate gadget-drone socket.
gadget-drone sends messages to gadget-code:web intending for them to be routed
to the appropriate gadget-code:ide socket.
This architecture lets the IDE run under User control in any browser anywhere,
and serve as a remote control surface for one or more gadget-drone processes
running work orders on projects in chat sessions.
*/
export interface ServerToClientEvents {
/*
* gadget-code:ide => gadget-code:web => gadget-drone
*/
requestSessionLock: RequestSessionLockMessage;
submitPrompt: SubmitPromptMessage;
/*
* gadget-drone => gadget-code => gadget-code:ide
*/
thinking: ThinkingMessage;
response: ResponseMessage;
toolCall: ToolCallMessage;
/*
* Gadget Code => Drone
*/
requestSessionLock: RequestSessionLockMessage;
submitPrompt: SubmitPromptMessage;
}
export interface ClientToServerEvents {
/*
* IDE => Gadget Code
* gadget-code:ide => gadget-code => gadget-drone
*/
requestSessionLock: RequestSessionLockMessage;
submitPrompt: SubmitPromptMessage;
/*
* Drone => Gadget Code
* gadget-drone => gadget-code => gadget-code:ide
*/
thinking: ThinkingMessage;
@ -39,4 +55,6 @@ export interface ClientToServerEvents {
toolCall: ToolCallMessage;
}
export interface SocketData {}
export interface SocketData {
/* no data defined */
}