275 lines
8.0 KiB
TypeScript
275 lines
8.0 KiB
TypeScript
// src/services/socket.ts
|
|
// Copyright (C) 2026 Robert Colbert <rob.colbert@openplatform.us>
|
|
// All Rights Reserved
|
|
|
|
import env from "../config/env.ts";
|
|
|
|
import http from "node:http";
|
|
import { Types } from "mongoose";
|
|
|
|
import { DisconnectReason, ExtendedError, Socket, Server } from "socket.io";
|
|
|
|
import { GadgetSocket, SocketSessionType } from "../lib/socket-session.ts";
|
|
import { CodeSession } from "../lib/code-session.ts";
|
|
import { DroneSession } from "../lib/drone-session.ts";
|
|
|
|
import {
|
|
ClientToServerEvents,
|
|
IDroneRegistration,
|
|
IIdeSession,
|
|
ServerToClientEvents,
|
|
SocketData,
|
|
} from "@gadget/api";
|
|
|
|
import DroneService from "./drone.ts";
|
|
import SessionService from "./session.ts";
|
|
import { DtpService } from "../lib/service.ts";
|
|
|
|
type CodeSessionMap = Map<string, CodeSession>;
|
|
type DroneSessionMap = Map<string, DroneSession>;
|
|
type ChatSessionCodeSessionMap = Map<string, CodeSession>;
|
|
|
|
class SocketService extends DtpService {
|
|
private codeSessions: CodeSessionMap = new Map<string, CodeSession>();
|
|
private droneSessions: DroneSessionMap = new Map<string, DroneSession>();
|
|
private chatSessionIndex: ChatSessionCodeSessionMap = new Map<string, CodeSession>();
|
|
private droneRegistrationIndex: DroneSessionMap = new Map<string, DroneSession>();
|
|
private codeSessionUserIndex: CodeSessionMap = new Map<string, CodeSession>();
|
|
|
|
private io?: Server<
|
|
ClientToServerEvents,
|
|
ServerToClientEvents,
|
|
never,
|
|
SocketData
|
|
>;
|
|
|
|
get name(): string {
|
|
return "SocketService";
|
|
}
|
|
get slug(): string {
|
|
return "svc:socket";
|
|
}
|
|
|
|
async start(): Promise<void> {
|
|
this.log.info("started");
|
|
}
|
|
|
|
async stop(): Promise<void> {
|
|
this.log.info("stopped");
|
|
}
|
|
|
|
async listen(httpServer: http.Server): Promise<void> {
|
|
/*
|
|
* Create Socket.io server
|
|
*/
|
|
this.io = new Server<
|
|
ClientToServerEvents,
|
|
ServerToClientEvents,
|
|
never,
|
|
SocketData
|
|
>(httpServer, {
|
|
maxHttpBufferSize: env.socket.maxHttpBufferSize,
|
|
cors: {
|
|
origin: "*",
|
|
methods: ["GET", "POST"],
|
|
},
|
|
});
|
|
this.io.use(this.onSocketAuth.bind(this));
|
|
this.io.on("connection", this.onSocketConnection.bind(this));
|
|
this.log.info("socket.io server initialized");
|
|
}
|
|
|
|
async onSocketAuth(
|
|
socket: GadgetSocket,
|
|
next: (err?: ExtendedError) => void,
|
|
) {
|
|
const token = socket.handshake.auth.token;
|
|
// this.log.debug("received socket authentication request", { token });
|
|
if (!token) {
|
|
this.log.warn("socket connection rejected: no token provided");
|
|
return next(new Error("Authentication required"));
|
|
}
|
|
|
|
/*
|
|
* Try first to validate as a User JWT session
|
|
*/
|
|
try {
|
|
const user = await SessionService.verifyJsonWebToken(token);
|
|
|
|
const session: CodeSession = new CodeSession(socket, user);
|
|
this.codeSessions.set(socket.id, session);
|
|
this.codeSessionUserIndex.set(user._id.toHexString(), session);
|
|
session.register();
|
|
|
|
socket.data = { sessionType: SocketSessionType.Code };
|
|
socket.on("disconnect", (reason: DisconnectReason, extra?: unknown) => {
|
|
this.onSocketDisconnect(socket, reason, extra);
|
|
});
|
|
|
|
return next();
|
|
} catch (cause) {
|
|
const error = cause as Error;
|
|
if (error.name !== "TokenVerifyError") {
|
|
this.log.warn("socket connection rejected: invalid token", {
|
|
error,
|
|
});
|
|
return next(new Error("Invalid authentication token"));
|
|
}
|
|
// fall through to next test
|
|
}
|
|
|
|
/*
|
|
* If not a User JWT, try to validate as a Drone session
|
|
*/
|
|
try {
|
|
const registrationId = Types.ObjectId.createFromHexString(token);
|
|
const registration = await DroneService.getById(registrationId);
|
|
|
|
const droneSession: DroneSession = new DroneSession(socket, registration);
|
|
this.droneSessions.set(socket.id, droneSession);
|
|
this.droneRegistrationIndex.set(registration._id.toHexString(), droneSession);
|
|
droneSession.register();
|
|
|
|
socket.data = { sessionType: SocketSessionType.Drone };
|
|
socket.on("disconnect", (reason: DisconnectReason, extra?: unknown) => {
|
|
this.onSocketDisconnect(socket, reason, extra);
|
|
});
|
|
|
|
return next();
|
|
} catch (error) {
|
|
this.log.warn("socket connection rejected: invalid auth token", {
|
|
error,
|
|
});
|
|
next(new Error("Invalid authentication token"));
|
|
}
|
|
}
|
|
|
|
onSocketConnection(socket: Socket) {
|
|
switch (socket.data.sessionType) {
|
|
case SocketSessionType.Code:
|
|
return this.onSocketConnectCode(socket);
|
|
case SocketSessionType.Drone:
|
|
return this.onSocketConnectDrone(socket);
|
|
default:
|
|
break;
|
|
}
|
|
this.log.error("invalid socket session type during connect");
|
|
}
|
|
|
|
onSocketConnectCode(socket: Socket) {
|
|
const session = this.codeSessions.get(socket.id);
|
|
if (!session) {
|
|
this.log.warn("invalid code session during connect");
|
|
socket.disconnect(true);
|
|
return;
|
|
}
|
|
this.log.info("code socket connected", {
|
|
id: socket.id,
|
|
userId: session.user._id.toHexString(),
|
|
});
|
|
}
|
|
|
|
onSocketConnectDrone(socket: Socket) {
|
|
const session = this.droneSessions.get(socket.id);
|
|
if (!session) {
|
|
this.log.warn("invalid drone session during connect");
|
|
socket.disconnect(true);
|
|
return;
|
|
}
|
|
this.log.info("drone socket connected", {
|
|
id: socket.id,
|
|
registrationId: session.registration._id.toHexString(),
|
|
});
|
|
}
|
|
|
|
onSocketDisconnect(
|
|
socket: Socket,
|
|
reason: DisconnectReason,
|
|
extra?: unknown,
|
|
) {
|
|
this.log.info("socket disconnect", { reason, extra });
|
|
switch (socket.data.sessionType) {
|
|
case SocketSessionType.Code:
|
|
this.log.info("closing code socket session", { id: socket.id });
|
|
this.codeSessions.delete(socket.id);
|
|
const codeUserIndex = (this as any).codeSessionUserIndex;
|
|
if (codeUserIndex) {
|
|
const session = this.codeSessions.get(socket.id);
|
|
if (session) {
|
|
codeUserIndex.delete(session.user._id.toHexString());
|
|
}
|
|
}
|
|
return;
|
|
|
|
case SocketSessionType.Drone:
|
|
this.log.info("closing drone socket session", { id: socket.id });
|
|
const droneSession = this.droneSessions.get(socket.id);
|
|
if (droneSession) {
|
|
this.droneRegistrationIndex.delete(droneSession.registration._id.toHexString());
|
|
}
|
|
this.droneSessions.delete(socket.id);
|
|
return;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
this.log.error("invalid session type in socket disconnect", {
|
|
id: socket.id,
|
|
data: socket.data,
|
|
});
|
|
}
|
|
|
|
getCodeSession(ideSession: IIdeSession): CodeSession {
|
|
const session = this.codeSessionUserIndex.get(ideSession._id.toHexString());
|
|
if (!session) {
|
|
const error = new Error("code session not found");
|
|
error.statusCode = 404;
|
|
throw error;
|
|
}
|
|
return session;
|
|
}
|
|
|
|
getDroneSession(registration: IDroneRegistration): DroneSession {
|
|
const session = this.droneRegistrationIndex.get(registration._id.toHexString());
|
|
if (!session) {
|
|
const error = new Error("drone session not found");
|
|
error.statusCode = 404;
|
|
throw error;
|
|
}
|
|
return session;
|
|
}
|
|
|
|
/**
|
|
* Registers a code session by its chat session ID for reverse lookup.
|
|
*/
|
|
registerChatSession(chatSessionId: string, codeSession: CodeSession): void {
|
|
this.chatSessionIndex.set(chatSessionId, codeSession);
|
|
}
|
|
|
|
/**
|
|
* Gets a code session by its chat session ID.
|
|
*/
|
|
getCodeSessionByChatSessionId(chatSessionId: Types.ObjectId | string): CodeSession {
|
|
const chatSessionIdStr = typeof chatSessionId === "string"
|
|
? chatSessionId
|
|
: chatSessionId.toHexString();
|
|
const session = this.chatSessionIndex.get(chatSessionIdStr);
|
|
if (!session) {
|
|
const error = new Error("code session not found for chat session");
|
|
error.statusCode = 404;
|
|
throw error;
|
|
}
|
|
return session;
|
|
}
|
|
|
|
/**
|
|
* Unregisters a code session by its chat session ID.
|
|
*/
|
|
unregisterChatSession(chatSessionId: string): void {
|
|
this.chatSessionIndex.delete(chatSessionId);
|
|
}
|
|
}
|
|
|
|
export default new SocketService();
|