gadget/docs/archive/services/agent.ts

1501 lines
43 KiB
TypeScript

// src/services/agent.ts
// Copyright (C) 2025 DTP Technologies, LLC
// All Rights Reserved
import env from "../config/env.js";
import path from "node:path";
import fs from "node:fs/promises";
import { EventEmitter } from "node:events";
import { IUser } from "../models/user.js";
import { IChatSession, ChatSession } from "../models/chat-session.js";
import { ChatHistory, ChatHistoryStatus } from "../models/chat-history.js";
import { DtpService } from "../lib/service.js";
import {
getToolByName,
getToolsBySlugs,
getToolsExcludingCategory,
getToolsByMode,
} from "../tools/index.js";
import { DtpTool } from "../lib/tool.js";
import { ChatSessionType } from "../models/chat-session.js";
import { processImageForLlm } from "../lib/image-utils.js";
import AiService from "./ai.js";
import type {
ChatMessage,
ToolDefinition,
ToolCall,
ChatChunk,
} from "../lib/ai-client.js";
interface ChatContext {
messages: ChatMessage[];
tools: ToolDefinition[];
}
class AgentService extends DtpService {
private events = new EventEmitter();
private pendingUserMessages: Array<{ content: string; displayName: string }> =
[];
private abortController: AbortController | null = null;
on(event: string, listener: (...args: any[]) => void): this {
this.events.on(event, listener);
return this;
}
off(event: string, listener: (...args: any[]) => void): this {
this.events.off(event, listener);
return this;
}
private emit(event: string, ...args: any[]): boolean {
return this.events.emit(event, ...args);
}
/**
* Queue a user message to be injected into the conversation during processing.
* The message will be sent at the next opportunity (after tool results).
*/
queueUserMessage(content: string, displayName: string): void {
this.pendingUserMessages.push({ content, displayName });
this.log.info("User message queued", {
displayName,
contentLength: content.length,
totalQueued: this.pendingUserMessages.length,
});
}
/**
* Clear any pending user messages (e.g., when aborting).
*/
clearPendingUserMessages(): void {
this.pendingUserMessages = [];
}
/**
* Get all pending user messages and clear the queue.
* Returns messages to be injected into the conversation.
*/
getPendingUserMessages(): Array<{ content: string; displayName: string }> {
const messages = [...this.pendingUserMessages];
this.pendingUserMessages = [];
return messages;
}
/**
* Check if there are any pending user messages.
*/
hasPendingMessages(): boolean {
return this.pendingUserMessages.length > 0;
}
/**
* Abort the current agent operation.
* Sets the abort signal to stop the streaming chat loop.
*/
abort(): void {
if (this.abortController) {
this.abortController.abort();
this.log.info("Agent operation aborted");
}
this.clearPendingUserMessages();
}
/**
* Set the abort controller for the current operation.
* Called at the start of streamChat to enable abort functionality.
*/
private setAbortController(controller: AbortController): void {
this.abortController = controller;
}
/**
* Clear the abort controller after operation completes.
*/
private clearAbortController(): void {
this.abortController = null;
}
get name(): string {
return "AgentService";
}
get slug(): string {
return "agent";
}
async start(): Promise<void> {
this.log.info("service started");
}
async stop(): Promise<void> {
this.log.info("service stopped");
}
async chat(sessionId: string, prompt: string): Promise<void> {
const session = await ChatSession.findById(sessionId).populate("user");
if (!session) {
throw new Error("Session not found");
}
const user = session.user as IUser;
const userId = user._id.toString();
const sessionObjId = session._id as any;
const agentModel = await AiService.getAgentModel(userId);
if (!agentModel) {
throw new Error("No agent model configured");
}
// Create ChatHistory record at turn start with "processing" status
const chatHistory = new ChatHistory({
user: user._id,
session: session._id,
prompt,
mode: session.mode,
toolCalls: [],
fileOperations: [],
response: { thinking: "", message: "" },
status: ChatHistoryStatus.Processing,
isSubagent: false,
inputTokens: 0,
outputTokens: 0,
});
const savedHistory = await chatHistory.save();
const historyId = savedHistory._id.toString();
const context = await this.buildAgentContext(session, prompt);
let fullResponse = "";
let fullThinking = "";
let collectedToolCalls: any[] = [];
let subagentHistoryIds: string[] = [];
let inputTokens = 0;
let outputTokens = 0;
try {
const result = await this.streamChat(
session,
context.messages,
context.tools,
agentModel,
userId,
historyId,
);
fullResponse = result.response;
fullThinking = result.thinking;
collectedToolCalls = result.collectedToolCalls;
subagentHistoryIds = result.subagentHistoryIds;
inputTokens = result.inputTokens;
outputTokens = result.outputTokens;
// Finalize ChatHistory with success status
await ChatHistory.findByIdAndUpdate(historyId, {
status: ChatHistoryStatus.Success,
"response.thinking": fullThinking,
"response.message": fullResponse,
toolCalls: (collectedToolCalls ?? []).map((tc) => ({
tool: {
name: tc.name,
callId: tc.callId,
parameters: tc.parameters,
},
response: tc.response,
fileOperation: tc.fileOperation ?? undefined,
subagentStats: tc.subagentStats ?? undefined,
})),
fileOperations: (collectedToolCalls ?? [])
.filter((tc) => tc.fileOperation != null)
.map((tc) => tc.fileOperation),
inputTokens,
outputTokens,
subagentHistory: subagentHistoryIds || [],
});
await ChatSession.findByIdAndUpdate(sessionObjId, {
$inc: {
"stats.turnCount": 1,
"stats.inputTokens": inputTokens,
"stats.outputTokens": outputTokens,
},
lastMessageAt: new Date(),
});
if (session.stats.turnCount === 0) {
await this.autoNameSession(session, prompt, fullResponse);
}
} catch (error) {
this.log.error("Chat error", { error, sessionId, prompt });
// Finalize ChatHistory with failed status
await ChatHistory.findByIdAndUpdate(historyId, {
status: ChatHistoryStatus.Failed,
error: {
message: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
timestamp: new Date(),
},
});
await ChatSession.findByIdAndUpdate(sessionObjId, {
$inc: {
"stats.turnCount": 1,
"stats.inputTokens": 0,
"stats.outputTokens": 0,
},
lastMessageAt: new Date(),
});
throw error;
}
}
async spawnSubagent(
session: IChatSession,
agentType: "explore" | "general",
prompt: string,
): Promise<{
response: string;
history: Awaited<ReturnType<typeof ChatHistory.find>>;
stats: { inputTokens: number; outputTokens: number; toolCallCount: number };
historyIds: string[];
}> {
const user = session.user as IUser;
const userId = user._id.toString();
const sessionObjId = session._id as any;
const agentModel = await AiService.getAgentModel(userId);
if (!agentModel) {
throw new Error("No agent model configured");
}
const context = await this.buildSubagentContext(session, agentType, prompt);
try {
const {
response: fullResponse,
stats,
historyIds,
} = await this.streamSubagentChat(
session,
context.messages,
context.tools,
agentModel,
userId,
);
const history = await ChatHistory.find({
session: sessionObjId,
})
.sort({ createdAt: -1 })
.lean();
await ChatSession.findByIdAndUpdate(sessionObjId, {
lastMessageAt: new Date(),
});
return { response: fullResponse, history, stats, historyIds };
} catch (error) {
this.log.error("Subagent error", {
error,
agentType,
prompt,
});
await this.saveChatHistory(
session,
prompt,
undefined,
undefined,
error instanceof Error
? { message: error.message, stack: error.stack }
: { message: String(error) },
);
throw error;
}
}
private async buildSubagentContext(
session: IChatSession,
agentType: "explore" | "general",
currentPrompt: string,
): Promise<ChatContext> {
const messages: ChatMessage[] = [];
const systemContent = await this.buildSubagentSystemPrompt(
session,
agentType,
);
messages.push({ role: "system", content: systemContent });
await fs.writeFile(
path.join(
env.installRoot,
"logs",
`subagent.system.${agentType}.${session._id}.md`,
),
systemContent,
"utf-8",
);
messages.push({ role: "user", content: currentPrompt });
const modeFilteredTools = getToolsByMode(session.mode);
const subagentTools = getToolsExcludingCategory("browser")
.filter((t) => modeFilteredTools.includes(t))
.filter((t) => t.slug !== "subagent")
.map((t: any) => t.definition);
this.log.debug("tools available to subagent", {
toolCount: subagentTools.length,
toolNames: subagentTools.map((t: any) => t.function.name),
});
return { messages, tools: subagentTools };
}
private async buildSubagentSystemPrompt(
session: IChatSession,
agentType: "explore" | "general",
): Promise<string> {
const user = session.user as IUser;
const promptFilePath = path.join(
env.installRoot,
"data",
"prompts",
"subagent",
agentType,
"system.md",
);
let prompt = await fs.readFile(promptFilePath, "utf-8");
prompt +=
"\n\n## SESSION INFORMATION\n\n- Session ID: " +
session._id +
"\n- Session Type: " +
session.type +
"\n- Created At: " +
session.createdAt.toISOString();
prompt +=
"\n\n## USER INFORMATION\n\n- User ID: " +
user._id +
"\n- Username: " +
user.username +
"\n- Display Name: " +
user.displayName;
prompt +=
"\n\n## SYSTEM INFORMATION\n\n- Current Time: " +
new Date().toISOString() +
"\n- Gadget Version: " +
env.pkg.version +
"\n- Timezone: " +
env.timezone;
try {
const agentConfig = await fs.readFile("AGENTS.md", "utf-8");
this.log.debug("integrating AGENTS.md into subagent system prompt", {
subagent: agentType,
});
prompt += "\n\n## AGENT CONFIGURATION\n\n" + agentConfig;
} catch (error) {
this.log.debug("AGENTS.md file not found or failed to load.", {
subagent: agentType,
error: error instanceof Error ? error.message : String(error),
});
}
return this.renderSystemPromptTemplate(prompt, session.mode);
}
private async streamSubagentChat(
session: IChatSession,
messages: ChatMessage[],
tools: ToolDefinition[],
llm: string | undefined | null,
userId: string,
): Promise<{
response: string;
thinking: string;
stats: { inputTokens: number; outputTokens: number; toolCallCount: number };
historyIds: string[];
}> {
let fullResponse = "";
let fullThinking = "";
let inputTokens = 0;
let outputTokens = 0;
let toolCallCount = 0;
const historyIds: string[] = [];
if (!llm) {
throw new Error("No LLM model configured for this subagent");
}
const client = await AiService.getAgentClient(userId);
client.on("finish_reason", (reason) => {
this.emit("finish_reason", { reason });
});
let chatOptions;
try {
const providerInfo = await AiService.getAgentProviderInfo(userId);
if (providerInfo) {
chatOptions = await AiService.getModelChatOptions(
userId,
providerInfo.providerId,
providerInfo.model,
);
}
} catch (err) {
this.log.warn("Failed to get model settings, using defaults", {
error: err,
});
}
let currentMessages: ChatMessage[] = [...messages];
let continueLoop = true;
let lastFinishReason: string | undefined;
let iterations = 0;
while (continueLoop) {
iterations++;
continueLoop = false;
let iterationThinking = "";
let iterationResponse = "";
const toolCallMap = new Map<number, ToolCall>();
const collectedToolCalls: Array<{
name: string;
callId: string;
parameters: Array<{ name: string; value: string }>;
response: string;
fileOperation?: unknown;
subagentStats?: unknown;
}> = [];
this.log.debug("Starting subagent chat request", {
iteration: iterations,
messageCount: currentMessages.length,
hasTools: tools.length > 0,
model: llm,
});
let stream: AsyncGenerator<ChatChunk>;
try {
stream = await client.streamChat(
currentMessages,
tools,
llm,
chatOptions,
);
} catch (chatError) {
this.log.error("Failed to create subagent stream", {
error:
chatError instanceof Error ? chatError.message : String(chatError),
model: llm,
});
throw chatError;
}
for await (const chunk of stream) {
if (chunk.thinking) {
iterationThinking += chunk.thinking;
fullThinking += chunk.thinking;
}
if (chunk.content) {
iterationResponse += chunk.content;
fullResponse += chunk.content;
}
if (chunk.done && chunk.finish_reason) {
lastFinishReason = chunk.finish_reason;
}
if (chunk.toolCall) {
const tcDelta = chunk.toolCall;
const index = tcDelta.index ?? 0;
let existing = toolCallMap.get(index);
if (!existing) {
existing = {
id:
tcDelta.id ?? "call-" + Math.random().toString(36).slice(2, 9),
type: "function",
function: {
name: tcDelta.function?.name ?? "",
arguments: "",
},
};
toolCallMap.set(index, existing);
} else {
if (tcDelta.id) existing.id = tcDelta.id;
if (tcDelta.function?.name)
existing.function.name = tcDelta.function.name;
}
if (tcDelta.function?.arguments !== undefined) {
const deltaArgs =
typeof tcDelta.function.arguments === "string"
? tcDelta.function.arguments
: JSON.stringify(tcDelta.function.arguments);
const currentArgs = existing.function.arguments;
const trimmedDelta = deltaArgs.trim();
const isFullObject =
trimmedDelta.startsWith("{") && trimmedDelta.endsWith("}");
if (
isFullObject &&
(currentArgs.length === 0 || currentArgs === deltaArgs)
) {
existing.function.arguments = deltaArgs;
} else {
existing.function.arguments += deltaArgs;
}
}
}
}
const finalToolCalls = Array.from(toolCallMap.values());
currentMessages.push({
role: "assistant",
content: iterationResponse,
tool_calls: finalToolCalls.length > 0 ? finalToolCalls : undefined,
});
if (finalToolCalls.length > 0) {
continueLoop = true;
for (const toolCall of finalToolCalls) {
const toolName = toolCall.function.name;
const toolArgsRaw = toolCall.function.arguments;
let result = await this.executeTool(toolName, toolArgsRaw, session);
let toolArgs: any = {};
try {
toolArgs = JSON.parse(toolArgsRaw);
} catch {
// ignore, executeTool already handled it
}
// Extract metadata from result
let parsedResult: any = null;
try {
parsedResult = JSON.parse(result);
} catch {
// ignore
}
toolCallCount++;
inputTokens += Math.ceil(toolArgsRaw.length / 4);
outputTokens += Math.ceil(result.length / 4);
this.log.debug("Subagent tool result for LLM", {
toolName,
resultLength: result.length,
preview:
result.length > 100 ? result.substring(0, 100) + "..." : result,
});
currentMessages.push({
role: "tool",
content: result,
tool_call_id: toolCall.id,
});
const parameters: Array<{ name: string; value: string }> = [];
for (const [k, v] of Object.entries(toolArgs)) {
const value = typeof v === "string" ? v : JSON.stringify(v);
// Skip parameters with empty string values
if (value !== "") {
parameters.push({ name: k, value });
}
}
collectedToolCalls.push({
name: toolName,
callId: toolCall.id,
parameters,
response: result,
fileOperation: parsedResult?.data?.fileOperation,
subagentStats: parsedResult?.data?.subagentStats,
});
}
} else {
this.log.info(
"Subagent loop terminating: no tool calls found in this iteration.",
);
}
// Check for queued user messages and inject them before next LLM call
const pendingMessages = this.getPendingUserMessages();
if (pendingMessages.length > 0) {
this.log.info("Injecting queued user messages in subagent", {
count: pendingMessages.length,
});
for (const msg of pendingMessages) {
currentMessages.push({
role: "user",
content: msg.content,
});
// Emit event so UI can display the message
this.log.debug("Emitting queued_message_sent event", {
displayName: msg.displayName,
content: msg.content.substring(0, 50),
});
this.emit("queued_message_sent", {
displayName: msg.displayName,
content: msg.content,
});
}
// Continue loop to process the injected messages
continueLoop = true;
}
// Save this turn's history
const turnHistoryId = await this.saveChatHistory(
session,
iterations === 1 ? messages[messages.length - 1]?.content || "" : "...",
iterationThinking,
iterationResponse,
undefined,
collectedToolCalls,
undefined,
true,
inputTokens,
outputTokens,
);
historyIds.push(turnHistoryId);
if (lastFinishReason === "length") {
this.log.info("Subagent loop continuing: finish_reason was 'length'.");
currentMessages.push({
role: "user",
content: "Please continue.",
});
continueLoop = true;
lastFinishReason = undefined;
}
}
// Update global session stats
await ChatSession.findByIdAndUpdate(session._id, {
$inc: { "stats.toolCallCount": toolCallCount },
});
// Ensure we have a response to return to the parent
if (!fullResponse.trim()) {
if (toolCallCount > 0) {
fullResponse = `Subagent task completed. Executed ${toolCallCount} tool calls to perform the requested actions.`;
} else {
fullResponse = "Subagent task completed without additional actions.";
}
this.log.info("Subagent provided empty response; synthesized summary.", {
toolCallCount,
});
}
this.log.info("Subagent chat stream finished.", {
iterations,
fullResponseLength: fullResponse.length,
toolCallCount,
});
return {
response: fullResponse,
thinking: fullThinking,
stats: { inputTokens, outputTokens, toolCallCount },
historyIds,
};
}
private async autoNameSession(
session: IChatSession,
prompt: string,
response: string,
): Promise<void> {
try {
const user = session.user as IUser;
const userId = user._id.toString();
const utilityModel = await AiService.getUtilityModel(userId);
const namePrompt =
"Generate a short descriptive name (3-5 words) for this conversation based on the first user message and AI response. Just return the name, nothing else.\n\nUser message: " +
prompt.trim() +
"\n\nAI response: " +
response.substring(0, 500);
const suggestedName = await this.generate(userId, namePrompt, {
model: utilityModel,
});
const finalName = suggestedName
.trim()
.replace(/^"|"$/g, "")
.substring(0, 100);
if (finalName) {
session.name = finalName;
await session.save();
this.log.info("Auto-named session", {
sessionId: session._id,
name: finalName,
});
this.emit("session_updated", {
sessionId: session._id.toString(),
name: finalName,
});
}
} catch (error) {
this.log.warn("Failed to auto-name session", {
sessionId: session._id,
error,
});
}
}
async generate(
userId: string,
prompt: string,
options?: { model?: string; systemPrompt?: string; tools?: string[] },
): Promise<string> {
const messages: ChatMessage[] = [];
if (options?.systemPrompt) {
messages.push({ role: "system", content: options.systemPrompt });
}
messages.push({ role: "user", content: prompt });
const tools = options?.tools
? getToolsBySlugs(options.tools).map((t) => t.definition as any)
: ([] as any[]);
const model = options?.model;
if (!model) {
throw new Error("No LLM model specified for generation");
}
const client = await AiService.getUtilityClient(userId);
client.on("finish_reason", (reason) => {
this.emit("finish_reason", { reason });
});
const response = await client.chat(messages, tools, model);
return response.content;
}
private async buildAgentContext(
session: IChatSession,
currentPrompt: string,
): Promise<ChatContext> {
const messages: ChatMessage[] = [];
const mode = session.mode;
const systemContent = await this.buildAgentSystemPrompt(session);
messages.push({ role: "system", content: systemContent });
await fs.writeFile(
path.join(
env.installRoot,
"logs",
`agent.system.${mode}.${session._id}.md`,
),
systemContent,
"utf-8",
);
const history = await ChatHistory.find({
session: session._id,
isSubagent: { $ne: true },
})
.sort({ createdAt: -1 })
.lean();
for (const turn of history.reverse()) {
messages.push({ role: "user", content: turn.prompt });
// Include error information for failed turns
if (turn.status === "failed" && turn.error) {
const errorContext = `\n\n[ERROR: ${turn.error.message}]`;
const toolCallContext =
turn.toolCalls && turn.toolCalls.length > 0
? `\n[Tool calls made before error: ${turn.toolCalls.map((tc) => tc.tool.name).join(", ")}]`
: "";
const thinkingContext = turn.response?.thinking
? `\n[Thinking before error: ${turn.response.thinking.substring(0, 200)}${turn.response.thinking.length > 200 ? "..." : ""}]`
: "";
messages.push({
role: "assistant",
content: `[Turn failed${errorContext}${toolCallContext}${thinkingContext}]`,
});
} else if (turn.response?.message) {
messages.push({ role: "assistant", content: turn.response.message });
}
}
messages.push({ role: "user", content: currentPrompt });
const modeFilteredTools = getToolsByMode(mode);
let availableTools =
session.type === ChatSessionType.Extension
? modeFilteredTools
: getToolsExcludingCategory("browser").filter((t) =>
modeFilteredTools.includes(t),
);
// Filter tools based on user configuration
const userId = (session.user as IUser)._id.toString();
const userConfiguredTools: DtpTool[] = [];
for (const tool of availableTools) {
if (tool.requiresUserConfig()) {
const isConfigured = await tool.checkUserConfig(userId);
if (isConfigured) {
userConfiguredTools.push(tool);
} else {
this.log.debug(
`Tool ${tool.slug} excluded: user configuration required but not found`,
{
userId,
toolSlug: tool.slug,
},
);
}
} else {
userConfiguredTools.push(tool);
}
}
availableTools = userConfiguredTools;
const tools = availableTools.map((t: any) => t.definition);
this.log.debug("tools available to agent this turn", {
toolCount: tools.length,
toolNames: tools.map((t) => t.function.name),
});
return { messages, tools };
}
private async buildAgentSystemPrompt(session: IChatSession): Promise<string> {
const user = session.user as IUser;
const mode = session.mode;
const promptFilePath = path.join(
env.installRoot,
"data",
"prompts",
"agent",
mode,
"system.md",
);
let prompt = await fs.readFile(promptFilePath, "utf-8");
prompt +=
"\n\n## SESSION INFORMATION\n\n- Session ID: " +
session._id +
"\n- Session Type: " +
session.type +
"\n- Created At: " +
session.createdAt.toISOString();
prompt +=
"\n\n## USER INFORMATION\n\n- User ID: " +
user._id +
"\n- Username: " +
user.username +
"\n- Display Name: " +
user.displayName;
prompt +=
"\n\n## SYSTEM INFORMATION\n\n- Current Time: " +
new Date().toISOString() +
"\n- Gadget Version: " +
env.pkg.version +
"\n- Timezone: " +
env.timezone;
try {
const agentConfig = await fs.readFile("AGENTS.md", "utf-8");
this.log.debug("integrating AGENTS.md into system prompt");
prompt += "\n\n## AGENT CONFIGURATION\n\n" + agentConfig;
} catch (error) {
this.log.debug("AGENTS.md file not found or failed to load.", {
error: error instanceof Error ? error.message : String(error),
});
}
prompt += "\n\n## PINBOARD\n\n";
if (session.pins.length > 0) {
for (const pin of session.pins) {
prompt += "- [" + pin._id?.toString() + "] " + pin.content + "\n";
}
} else {
prompt += "The pinboard is empty. Use the `pin_add` tool to add notes.\n";
}
return this.renderSystemPromptTemplate(prompt, mode);
}
async renderSystemPromptTemplate(
prompt: string,
_mode: string,
): Promise<string> {
let content;
/*
* subagents.md
*/
content = await fs.readFile(
path.join(env.installRoot, "data", "prompts", "common", "subagents.md"),
"utf-8",
);
prompt = prompt.replace("{{subagent_section}}", content.trim());
/*
* scope-block.md
*/
content = await fs.readFile(
path.join(env.installRoot, "data", "prompts", "common", "scope-block.md"),
"utf-8",
);
prompt = prompt.replace("{{scope_block}}", content.trim());
return prompt;
}
private async streamChat(
session: IChatSession,
messages: ChatMessage[],
tools: ToolDefinition[],
llm: string | undefined | null,
userId: string,
historyId: string,
): Promise<{
response: string;
thinking: string;
collectedToolCalls: Array<{
name: string;
callId: string;
parameters: Array<{ name: string; value: string }>;
response: string;
fileOperation?: unknown;
subagentStats?: unknown;
}>;
subagentHistoryIds: string[];
inputTokens: number;
outputTokens: number;
}> {
let fullResponse = "";
let fullThinking = "";
const collectedToolCalls: Array<{
name: string;
callId: string;
parameters: Array<{ name: string; value: string }>;
response: string;
fileOperation?: unknown;
subagentStats?: unknown;
}> = [];
const subagentHistoryIds: string[] = [];
let toolCallCount = 0;
let inputTokens = 0;
let outputTokens = 0;
// Estimate input tokens from message content (rough estimate: 4 chars per token)
const inputText = messages.map((m) => m.content).join(" ");
inputTokens = Math.ceil(inputText.length / 4);
if (!llm) {
throw new Error("No LLM model configured for this agent");
}
const client = await AiService.getAgentClient(userId);
client.on("finish_reason", (reason) => {
this.emit("finish_reason", { reason });
});
let chatOptions;
try {
const providerInfo = await AiService.getAgentProviderInfo(userId);
if (providerInfo) {
chatOptions = await AiService.getModelChatOptions(
userId,
providerInfo.providerId,
providerInfo.model,
);
}
} catch (err) {
this.log.warn("Failed to get model settings, using defaults", {
error: err,
});
}
let currentMessages: ChatMessage[] = [...messages];
let continueLoop = true;
let lastFinishReason: string | undefined;
let iterations = 0;
// Create abort controller for this operation
const abortController = new AbortController();
this.setAbortController(abortController);
while (continueLoop) {
// Check for abort signal at the start of each iteration
if (abortController.signal.aborted) {
this.log.info("Agent operation was aborted");
this.clearAbortController();
throw new Error("Operation aborted");
}
iterations++;
continueLoop = false;
// Check for queued user messages at the START of each iteration
const pendingMessagesAtStart = this.getPendingUserMessages();
if (pendingMessagesAtStart.length > 0) {
this.log.info("Injecting queued user messages at start of iteration", {
count: pendingMessagesAtStart.length,
iteration: iterations,
});
for (const msg of pendingMessagesAtStart) {
currentMessages.push({
role: "user",
content: msg.content,
});
}
// Continue loop to process the injected messages
continueLoop = true;
}
let iterationThinking = "";
let iterationResponse = "";
const toolCallMap = new Map<number, ToolCall>();
this.log.debug("Starting AI chat request", {
iteration: iterations,
messageCount: currentMessages.length,
hasTools: tools.length > 0,
model: llm,
});
let stream: AsyncGenerator<ChatChunk>;
try {
stream = await client.streamChat(
currentMessages,
tools,
llm,
chatOptions,
);
} catch (chatError) {
this.log.error("Failed to create chat stream", {
error:
chatError instanceof Error ? chatError.message : String(chatError),
model: llm,
});
throw chatError;
}
for await (const chunk of stream) {
// Check for abort signal during streaming
if (abortController.signal.aborted) {
this.log.info("Agent operation was aborted during streaming");
this.clearAbortController();
throw new Error("Operation aborted");
}
if (chunk.thinking) {
iterationThinking += chunk.thinking;
fullThinking += chunk.thinking;
this.emit("thinking", { text: chunk.thinking });
}
if (chunk.content) {
iterationResponse += chunk.content;
fullResponse += chunk.content;
outputTokens += Math.ceil(chunk.content.length / 4);
this.emit("message", { text: chunk.content });
}
if (chunk.done && chunk.finish_reason) {
lastFinishReason = chunk.finish_reason;
}
if (chunk.toolCall) {
const tcDelta = chunk.toolCall;
const index = tcDelta.index ?? 0;
let existing = toolCallMap.get(index);
if (!existing) {
existing = {
id:
tcDelta.id ?? "call-" + Math.random().toString(36).slice(2, 9),
type: "function",
function: {
name: tcDelta.function?.name ?? "",
arguments: "",
},
};
toolCallMap.set(index, existing);
if (existing.function.name) {
this.emit("tool_call", {
name: existing.function.name,
arguments: "",
});
}
} else {
if (tcDelta.id) existing.id = tcDelta.id;
if (tcDelta.function?.name) {
const oldName = existing.function.name;
existing.function.name = tcDelta.function.name;
if (!oldName && existing.function.name) {
this.emit("tool_call", {
name: existing.function.name,
arguments: "",
});
}
}
}
if (tcDelta.function?.arguments !== undefined) {
const deltaArgs =
typeof tcDelta.function.arguments === "string"
? tcDelta.function.arguments
: JSON.stringify(tcDelta.function.arguments);
const currentArgs = existing.function.arguments;
const trimmedDelta = deltaArgs.trim();
const isFullObject =
trimmedDelta.startsWith("{") && trimmedDelta.endsWith("}");
if (
isFullObject &&
(currentArgs.length === 0 || currentArgs === deltaArgs)
) {
existing.function.arguments = deltaArgs;
} else {
existing.function.arguments += deltaArgs;
}
}
}
}
const finalToolCalls = Array.from(toolCallMap.values());
currentMessages.push({
role: "assistant",
content: iterationResponse,
tool_calls: finalToolCalls.length > 0 ? finalToolCalls : undefined,
});
// Incrementally update ChatHistory with thinking and response from this iteration
if (iterationThinking || iterationResponse) {
await ChatHistory.findByIdAndUpdate(historyId, {
"response.thinking": iterationThinking,
"response.message": iterationResponse,
});
}
if (finalToolCalls.length > 0) {
continueLoop = true;
for (const toolCall of finalToolCalls) {
const toolName = toolCall.function.name;
const toolArgsRaw = toolCall.function.arguments;
let result = await this.executeTool(toolName, toolArgsRaw, session);
toolCallCount++;
let toolArgs: any = {};
try {
toolArgs = JSON.parse(toolArgsRaw);
} catch {
// ignore
}
let parsedResult: any = null;
try {
parsedResult = JSON.parse(result);
} catch {
// ignore
}
this.log.debug("Tool result for LLM", {
toolName,
resultLength: result.length,
preview:
result.length > 100 ? result.substring(0, 100) + "..." : result,
});
if (parsedResult?.success && parsedResult.data?.imageBase64) {
try {
const processed = await processImageForLlm(
parsedResult.data.imageBase64,
);
currentMessages.push({
role: "tool",
content: `Screenshot captured (${processed.metadata.width}x${processed.metadata.height})`,
images: [processed.base64],
tool_call_id: toolCall.id,
});
} catch (imgErr) {
currentMessages.push({
role: "tool",
content: result,
tool_call_id: toolCall.id,
});
}
} else {
currentMessages.push({
role: "tool",
content: result,
tool_call_id: toolCall.id,
});
}
const parameters: Array<{ name: string; value: string }> = [];
for (const [k, v] of Object.entries(toolArgs)) {
const value = typeof v === "string" ? v : JSON.stringify(v);
// Skip parameters with empty string values
if (value !== "") {
parameters.push({ name: k, value });
}
}
if (parsedResult?.data?.historyIds) {
subagentHistoryIds.push(...parsedResult.data.historyIds);
}
collectedToolCalls.push({
name: toolName,
callId: toolCall.id,
parameters,
response: result,
fileOperation: parsedResult?.data?.fileOperation,
subagentStats: parsedResult?.data?.subagentStats,
});
this.emit("tool_result", {
tool: toolName,
result: result,
fileOperation: parsedResult?.data?.fileOperation,
subagentStats: parsedResult?.data?.subagentStats,
subagentPrompt: parsedResult?.data?.subagentPrompt,
subagentType: parsedResult?.data?.subagentType,
});
// Incrementally update ChatHistory with tool calls from this iteration
await ChatHistory.findByIdAndUpdate(historyId, {
$push: {
toolCalls: {
tool: {
name: toolName,
callId: toolCall.id,
parameters,
},
response: result,
fileOperation: parsedResult?.data?.fileOperation ?? undefined,
subagentStats: parsedResult?.data?.subagentStats ?? undefined,
},
},
$inc: {
inputTokens: Math.ceil(toolArgsRaw.length / 4),
outputTokens: Math.ceil(result.length / 4),
},
});
}
} else {
this.log.info(
"Agent loop terminating: no tool calls found in this iteration.",
);
}
// Check for queued user messages and inject them before next LLM call
const newPendingMessages = this.getPendingUserMessages();
if (newPendingMessages.length > 0) {
this.log.info("Injecting queued user messages", {
count: newPendingMessages.length,
});
for (const msg of newPendingMessages) {
currentMessages.push({
role: "user",
content: msg.content,
});
// Emit event so UI can display the message
this.emit("queued_message_sent", {
displayName: msg.displayName,
content: msg.content,
});
}
// Continue loop to process the injected messages
continueLoop = true;
}
if (lastFinishReason === "length") {
this.log.info("Agent loop continuing: finish_reason was 'length'.");
currentMessages.push({
role: "user",
content: "Please continue.",
});
continueLoop = true;
lastFinishReason = undefined;
}
}
await ChatSession.findByIdAndUpdate(session._id, {
$inc: { "stats.toolCallCount": toolCallCount },
});
this.emit("done", {
fullResponse,
fullThinking,
inputTokens,
outputTokens,
});
this.log.info("Agent chat stream finished.", {
iterations,
fullResponseLength: fullResponse.length,
toolCallCount,
inputTokens,
outputTokens,
});
// Clear abort controller on successful completion
this.clearAbortController();
// Clear any remaining queued messages on successful completion
this.clearPendingUserMessages();
return {
response: fullResponse,
thinking: fullThinking,
collectedToolCalls,
subagentHistoryIds,
inputTokens,
outputTokens,
};
}
private async executeTool(
toolName: string,
args: string,
session: IChatSession,
): Promise<string> {
const tool = getToolByName(toolName);
if (!tool) {
const error = "Unknown tool: " + toolName;
this.log.error("Tool not found", { toolName });
return JSON.stringify({ success: false, error, message: error });
}
try {
this.log.debug("Tool execution started", { toolName, args });
const parsedArgs = JSON.parse(args);
const context = { session };
const result = await tool.execute(context, parsedArgs);
this.log.debug("Tool execution completed", {
toolName,
resultLength: result.length,
});
return result;
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : String(error);
this.log.error("Tool execution failed: " + toolName, {
tool: toolName,
arguments: args,
error: errorMessage,
stack: error instanceof Error ? error.stack : undefined,
});
try {
this.emit("tool_error", {
tool: toolName,
message: "Tool execution failed: " + errorMessage,
});
} catch (emitError) {
this.log.error("Failed to emit tool error event", {
toolName,
emitError,
});
}
return JSON.stringify({
success: false,
error: "TOOL_EXECUTION_FAILED",
message: errorMessage,
});
}
}
private async saveChatHistory(
session: IChatSession,
prompt: string,
thinking: string | undefined,
message: string | undefined,
error?: { message: string; stack?: string },
toolCalls?: Array<{
name: string;
callId: string;
parameters: Array<{ name: string; value: string }>;
response: string;
fileOperation?: unknown;
subagentStats?: unknown;
}>,
subagentHistoryIds?: string[],
isSubagent = false,
inputTokens = 0,
outputTokens = 0,
): Promise<string> {
const status = error ? "failed" : "success";
const user = session.user as IUser;
const toolCallDocs = (toolCalls ?? []).map((tc) => ({
tool: {
name: tc.name,
callId: tc.callId,
parameters: tc.parameters,
},
response: tc.response,
fileOperation: tc.fileOperation ?? undefined,
subagentStats: tc.subagentStats ?? undefined,
}));
const fileOpDocs = toolCallDocs
.filter((tc) => tc.fileOperation != null)
.map((tc) => tc.fileOperation);
const history = new ChatHistory({
user: user._id,
session: session._id,
prompt,
mode: session.mode,
toolCalls: toolCallDocs,
fileOperations: fileOpDocs,
response: { thinking, message },
status,
isSubagent,
subagentHistory: subagentHistoryIds || [],
error: error
? { message: error.message, stack: error.stack, timestamp: new Date() }
: undefined,
inputTokens,
outputTokens,
});
const saved = await history.save();
return saved._id.toString();
}
}
export default new AgentService();