// src/services/agent.ts // Copyright (C) 2025 DTP Technologies, LLC // All Rights Reserved import env from "../config/env.js"; import path from "node:path"; import fs from "node:fs/promises"; import { EventEmitter } from "node:events"; import { IUser } from "../models/user.js"; import { IChatSession, ChatSession } from "../models/chat-session.js"; import { ChatHistory, ChatHistoryStatus } from "../models/chat-history.js"; import { DtpService } from "../lib/service.js"; import { getToolByName, getToolsBySlugs, getToolsExcludingCategory, getToolsByMode, } from "../tools/index.js"; import { DtpTool } from "../lib/tool.js"; import { ChatSessionType } from "../models/chat-session.js"; import { processImageForLlm } from "../lib/image-utils.js"; import AiService from "./ai.js"; import type { ChatMessage, ToolDefinition, ToolCall, ChatChunk, } from "../lib/ai-client.js"; interface ChatContext { messages: ChatMessage[]; tools: ToolDefinition[]; } class AgentService extends DtpService { private events = new EventEmitter(); private pendingUserMessages: Array<{ content: string; displayName: string }> = []; private abortController: AbortController | null = null; on(event: string, listener: (...args: any[]) => void): this { this.events.on(event, listener); return this; } off(event: string, listener: (...args: any[]) => void): this { this.events.off(event, listener); return this; } private emit(event: string, ...args: any[]): boolean { return this.events.emit(event, ...args); } /** * Queue a user message to be injected into the conversation during processing. * The message will be sent at the next opportunity (after tool results). */ queueUserMessage(content: string, displayName: string): void { this.pendingUserMessages.push({ content, displayName }); this.log.info("User message queued", { displayName, contentLength: content.length, totalQueued: this.pendingUserMessages.length, }); } /** * Clear any pending user messages (e.g., when aborting). */ clearPendingUserMessages(): void { this.pendingUserMessages = []; } /** * Get all pending user messages and clear the queue. * Returns messages to be injected into the conversation. */ getPendingUserMessages(): Array<{ content: string; displayName: string }> { const messages = [...this.pendingUserMessages]; this.pendingUserMessages = []; return messages; } /** * Check if there are any pending user messages. */ hasPendingMessages(): boolean { return this.pendingUserMessages.length > 0; } /** * Abort the current agent operation. * Sets the abort signal to stop the streaming chat loop. */ abort(): void { if (this.abortController) { this.abortController.abort(); this.log.info("Agent operation aborted"); } this.clearPendingUserMessages(); } /** * Set the abort controller for the current operation. * Called at the start of streamChat to enable abort functionality. */ private setAbortController(controller: AbortController): void { this.abortController = controller; } /** * Clear the abort controller after operation completes. */ private clearAbortController(): void { this.abortController = null; } get name(): string { return "AgentService"; } get slug(): string { return "agent"; } async start(): Promise { this.log.info("service started"); } async stop(): Promise { this.log.info("service stopped"); } async chat(sessionId: string, prompt: string): Promise { const session = await ChatSession.findById(sessionId).populate("user"); if (!session) { throw new Error("Session not found"); } const user = session.user as IUser; const userId = user._id.toString(); const sessionObjId = session._id as any; const agentModel = await AiService.getAgentModel(userId); if (!agentModel) { throw new Error("No agent model configured"); } // Create ChatHistory record at turn start with "processing" status const chatHistory = new ChatHistory({ user: user._id, session: session._id, prompt, mode: session.mode, toolCalls: [], fileOperations: [], response: { thinking: "", message: "" }, status: ChatHistoryStatus.Processing, isSubagent: false, inputTokens: 0, outputTokens: 0, }); const savedHistory = await chatHistory.save(); const historyId = savedHistory._id.toString(); const context = await this.buildAgentContext(session, prompt); let fullResponse = ""; let fullThinking = ""; let collectedToolCalls: any[] = []; let subagentHistoryIds: string[] = []; let inputTokens = 0; let outputTokens = 0; try { const result = await this.streamChat( session, context.messages, context.tools, agentModel, userId, historyId, ); fullResponse = result.response; fullThinking = result.thinking; collectedToolCalls = result.collectedToolCalls; subagentHistoryIds = result.subagentHistoryIds; inputTokens = result.inputTokens; outputTokens = result.outputTokens; // Finalize ChatHistory with success status await ChatHistory.findByIdAndUpdate(historyId, { status: ChatHistoryStatus.Success, "response.thinking": fullThinking, "response.message": fullResponse, toolCalls: (collectedToolCalls ?? []).map((tc) => ({ tool: { name: tc.name, callId: tc.callId, parameters: tc.parameters, }, response: tc.response, fileOperation: tc.fileOperation ?? undefined, subagentStats: tc.subagentStats ?? undefined, })), fileOperations: (collectedToolCalls ?? []) .filter((tc) => tc.fileOperation != null) .map((tc) => tc.fileOperation), inputTokens, outputTokens, subagentHistory: subagentHistoryIds || [], }); await ChatSession.findByIdAndUpdate(sessionObjId, { $inc: { "stats.turnCount": 1, "stats.inputTokens": inputTokens, "stats.outputTokens": outputTokens, }, lastMessageAt: new Date(), }); if (session.stats.turnCount === 0) { await this.autoNameSession(session, prompt, fullResponse); } } catch (error) { this.log.error("Chat error", { error, sessionId, prompt }); // Finalize ChatHistory with failed status await ChatHistory.findByIdAndUpdate(historyId, { status: ChatHistoryStatus.Failed, error: { message: error instanceof Error ? error.message : String(error), stack: error instanceof Error ? error.stack : undefined, timestamp: new Date(), }, }); await ChatSession.findByIdAndUpdate(sessionObjId, { $inc: { "stats.turnCount": 1, "stats.inputTokens": 0, "stats.outputTokens": 0, }, lastMessageAt: new Date(), }); throw error; } } async spawnSubagent( session: IChatSession, agentType: "explore" | "general", prompt: string, ): Promise<{ response: string; history: Awaited>; stats: { inputTokens: number; outputTokens: number; toolCallCount: number }; historyIds: string[]; }> { const user = session.user as IUser; const userId = user._id.toString(); const sessionObjId = session._id as any; const agentModel = await AiService.getAgentModel(userId); if (!agentModel) { throw new Error("No agent model configured"); } const context = await this.buildSubagentContext(session, agentType, prompt); try { const { response: fullResponse, stats, historyIds, } = await this.streamSubagentChat( session, context.messages, context.tools, agentModel, userId, ); const history = await ChatHistory.find({ session: sessionObjId, }) .sort({ createdAt: -1 }) .lean(); await ChatSession.findByIdAndUpdate(sessionObjId, { lastMessageAt: new Date(), }); return { response: fullResponse, history, stats, historyIds }; } catch (error) { this.log.error("Subagent error", { error, agentType, prompt, }); await this.saveChatHistory( session, prompt, undefined, undefined, error instanceof Error ? { message: error.message, stack: error.stack } : { message: String(error) }, ); throw error; } } private async buildSubagentContext( session: IChatSession, agentType: "explore" | "general", currentPrompt: string, ): Promise { const messages: ChatMessage[] = []; const systemContent = await this.buildSubagentSystemPrompt( session, agentType, ); messages.push({ role: "system", content: systemContent }); await fs.writeFile( path.join( env.installRoot, "logs", `subagent.system.${agentType}.${session._id}.md`, ), systemContent, "utf-8", ); messages.push({ role: "user", content: currentPrompt }); const modeFilteredTools = getToolsByMode(session.mode); const subagentTools = getToolsExcludingCategory("browser") .filter((t) => modeFilteredTools.includes(t)) .filter((t) => t.slug !== "subagent") .map((t: any) => t.definition); this.log.debug("tools available to subagent", { toolCount: subagentTools.length, toolNames: subagentTools.map((t: any) => t.function.name), }); return { messages, tools: subagentTools }; } private async buildSubagentSystemPrompt( session: IChatSession, agentType: "explore" | "general", ): Promise { const user = session.user as IUser; const promptFilePath = path.join( env.installRoot, "data", "prompts", "subagent", agentType, "system.md", ); let prompt = await fs.readFile(promptFilePath, "utf-8"); prompt += "\n\n## SESSION INFORMATION\n\n- Session ID: " + session._id + "\n- Session Type: " + session.type + "\n- Created At: " + session.createdAt.toISOString(); prompt += "\n\n## USER INFORMATION\n\n- User ID: " + user._id + "\n- Username: " + user.username + "\n- Display Name: " + user.displayName; prompt += "\n\n## SYSTEM INFORMATION\n\n- Current Time: " + new Date().toISOString() + "\n- Gadget Version: " + env.pkg.version + "\n- Timezone: " + env.timezone; try { const agentConfig = await fs.readFile("AGENTS.md", "utf-8"); this.log.debug("integrating AGENTS.md into subagent system prompt", { subagent: agentType, }); prompt += "\n\n## AGENT CONFIGURATION\n\n" + agentConfig; } catch (error) { this.log.debug("AGENTS.md file not found or failed to load.", { subagent: agentType, error: error instanceof Error ? error.message : String(error), }); } return this.renderSystemPromptTemplate(prompt, session.mode); } private async streamSubagentChat( session: IChatSession, messages: ChatMessage[], tools: ToolDefinition[], llm: string | undefined | null, userId: string, ): Promise<{ response: string; thinking: string; stats: { inputTokens: number; outputTokens: number; toolCallCount: number }; historyIds: string[]; }> { let fullResponse = ""; let fullThinking = ""; let inputTokens = 0; let outputTokens = 0; let toolCallCount = 0; const historyIds: string[] = []; if (!llm) { throw new Error("No LLM model configured for this subagent"); } const client = await AiService.getAgentClient(userId); client.on("finish_reason", (reason) => { this.emit("finish_reason", { reason }); }); let chatOptions; try { const providerInfo = await AiService.getAgentProviderInfo(userId); if (providerInfo) { chatOptions = await AiService.getModelChatOptions( userId, providerInfo.providerId, providerInfo.model, ); } } catch (err) { this.log.warn("Failed to get model settings, using defaults", { error: err, }); } let currentMessages: ChatMessage[] = [...messages]; let continueLoop = true; let lastFinishReason: string | undefined; let iterations = 0; while (continueLoop) { iterations++; continueLoop = false; let iterationThinking = ""; let iterationResponse = ""; const toolCallMap = new Map(); const collectedToolCalls: Array<{ name: string; callId: string; parameters: Array<{ name: string; value: string }>; response: string; fileOperation?: unknown; subagentStats?: unknown; }> = []; this.log.debug("Starting subagent chat request", { iteration: iterations, messageCount: currentMessages.length, hasTools: tools.length > 0, model: llm, }); let stream: AsyncGenerator; try { stream = await client.streamChat( currentMessages, tools, llm, chatOptions, ); } catch (chatError) { this.log.error("Failed to create subagent stream", { error: chatError instanceof Error ? chatError.message : String(chatError), model: llm, }); throw chatError; } for await (const chunk of stream) { if (chunk.thinking) { iterationThinking += chunk.thinking; fullThinking += chunk.thinking; } if (chunk.content) { iterationResponse += chunk.content; fullResponse += chunk.content; } if (chunk.done && chunk.finish_reason) { lastFinishReason = chunk.finish_reason; } if (chunk.toolCall) { const tcDelta = chunk.toolCall; const index = tcDelta.index ?? 0; let existing = toolCallMap.get(index); if (!existing) { existing = { id: tcDelta.id ?? "call-" + Math.random().toString(36).slice(2, 9), type: "function", function: { name: tcDelta.function?.name ?? "", arguments: "", }, }; toolCallMap.set(index, existing); } else { if (tcDelta.id) existing.id = tcDelta.id; if (tcDelta.function?.name) existing.function.name = tcDelta.function.name; } if (tcDelta.function?.arguments !== undefined) { const deltaArgs = typeof tcDelta.function.arguments === "string" ? tcDelta.function.arguments : JSON.stringify(tcDelta.function.arguments); const currentArgs = existing.function.arguments; const trimmedDelta = deltaArgs.trim(); const isFullObject = trimmedDelta.startsWith("{") && trimmedDelta.endsWith("}"); if ( isFullObject && (currentArgs.length === 0 || currentArgs === deltaArgs) ) { existing.function.arguments = deltaArgs; } else { existing.function.arguments += deltaArgs; } } } } const finalToolCalls = Array.from(toolCallMap.values()); currentMessages.push({ role: "assistant", content: iterationResponse, tool_calls: finalToolCalls.length > 0 ? finalToolCalls : undefined, }); if (finalToolCalls.length > 0) { continueLoop = true; for (const toolCall of finalToolCalls) { const toolName = toolCall.function.name; const toolArgsRaw = toolCall.function.arguments; let result = await this.executeTool(toolName, toolArgsRaw, session); let toolArgs: any = {}; try { toolArgs = JSON.parse(toolArgsRaw); } catch { // ignore, executeTool already handled it } // Extract metadata from result let parsedResult: any = null; try { parsedResult = JSON.parse(result); } catch { // ignore } toolCallCount++; inputTokens += Math.ceil(toolArgsRaw.length / 4); outputTokens += Math.ceil(result.length / 4); this.log.debug("Subagent tool result for LLM", { toolName, resultLength: result.length, preview: result.length > 100 ? result.substring(0, 100) + "..." : result, }); currentMessages.push({ role: "tool", content: result, tool_call_id: toolCall.id, }); const parameters: Array<{ name: string; value: string }> = []; for (const [k, v] of Object.entries(toolArgs)) { const value = typeof v === "string" ? v : JSON.stringify(v); // Skip parameters with empty string values if (value !== "") { parameters.push({ name: k, value }); } } collectedToolCalls.push({ name: toolName, callId: toolCall.id, parameters, response: result, fileOperation: parsedResult?.data?.fileOperation, subagentStats: parsedResult?.data?.subagentStats, }); } } else { this.log.info( "Subagent loop terminating: no tool calls found in this iteration.", ); } // Check for queued user messages and inject them before next LLM call const pendingMessages = this.getPendingUserMessages(); if (pendingMessages.length > 0) { this.log.info("Injecting queued user messages in subagent", { count: pendingMessages.length, }); for (const msg of pendingMessages) { currentMessages.push({ role: "user", content: msg.content, }); // Emit event so UI can display the message this.log.debug("Emitting queued_message_sent event", { displayName: msg.displayName, content: msg.content.substring(0, 50), }); this.emit("queued_message_sent", { displayName: msg.displayName, content: msg.content, }); } // Continue loop to process the injected messages continueLoop = true; } // Save this turn's history const turnHistoryId = await this.saveChatHistory( session, iterations === 1 ? messages[messages.length - 1]?.content || "" : "...", iterationThinking, iterationResponse, undefined, collectedToolCalls, undefined, true, inputTokens, outputTokens, ); historyIds.push(turnHistoryId); if (lastFinishReason === "length") { this.log.info("Subagent loop continuing: finish_reason was 'length'."); currentMessages.push({ role: "user", content: "Please continue.", }); continueLoop = true; lastFinishReason = undefined; } } // Update global session stats await ChatSession.findByIdAndUpdate(session._id, { $inc: { "stats.toolCallCount": toolCallCount }, }); // Ensure we have a response to return to the parent if (!fullResponse.trim()) { if (toolCallCount > 0) { fullResponse = `Subagent task completed. Executed ${toolCallCount} tool calls to perform the requested actions.`; } else { fullResponse = "Subagent task completed without additional actions."; } this.log.info("Subagent provided empty response; synthesized summary.", { toolCallCount, }); } this.log.info("Subagent chat stream finished.", { iterations, fullResponseLength: fullResponse.length, toolCallCount, }); return { response: fullResponse, thinking: fullThinking, stats: { inputTokens, outputTokens, toolCallCount }, historyIds, }; } private async autoNameSession( session: IChatSession, prompt: string, response: string, ): Promise { try { const user = session.user as IUser; const userId = user._id.toString(); const utilityModel = await AiService.getUtilityModel(userId); const namePrompt = "Generate a short descriptive name (3-5 words) for this conversation based on the first user message and AI response. Just return the name, nothing else.\n\nUser message: " + prompt.trim() + "\n\nAI response: " + response.substring(0, 500); const suggestedName = await this.generate(userId, namePrompt, { model: utilityModel, }); const finalName = suggestedName .trim() .replace(/^"|"$/g, "") .substring(0, 100); if (finalName) { session.name = finalName; await session.save(); this.log.info("Auto-named session", { sessionId: session._id, name: finalName, }); this.emit("session_updated", { sessionId: session._id.toString(), name: finalName, }); } } catch (error) { this.log.warn("Failed to auto-name session", { sessionId: session._id, error, }); } } async generate( userId: string, prompt: string, options?: { model?: string; systemPrompt?: string; tools?: string[] }, ): Promise { const messages: ChatMessage[] = []; if (options?.systemPrompt) { messages.push({ role: "system", content: options.systemPrompt }); } messages.push({ role: "user", content: prompt }); const tools = options?.tools ? getToolsBySlugs(options.tools).map((t) => t.definition as any) : ([] as any[]); const model = options?.model; if (!model) { throw new Error("No LLM model specified for generation"); } const client = await AiService.getUtilityClient(userId); client.on("finish_reason", (reason) => { this.emit("finish_reason", { reason }); }); const response = await client.chat(messages, tools, model); return response.content; } private async buildAgentContext( session: IChatSession, currentPrompt: string, ): Promise { const messages: ChatMessage[] = []; const mode = session.mode; const systemContent = await this.buildAgentSystemPrompt(session); messages.push({ role: "system", content: systemContent }); await fs.writeFile( path.join( env.installRoot, "logs", `agent.system.${mode}.${session._id}.md`, ), systemContent, "utf-8", ); const history = await ChatHistory.find({ session: session._id, isSubagent: { $ne: true }, }) .sort({ createdAt: -1 }) .lean(); for (const turn of history.reverse()) { messages.push({ role: "user", content: turn.prompt }); // Include error information for failed turns if (turn.status === "failed" && turn.error) { const errorContext = `\n\n[ERROR: ${turn.error.message}]`; const toolCallContext = turn.toolCalls && turn.toolCalls.length > 0 ? `\n[Tool calls made before error: ${turn.toolCalls.map((tc) => tc.tool.name).join(", ")}]` : ""; const thinkingContext = turn.response?.thinking ? `\n[Thinking before error: ${turn.response.thinking.substring(0, 200)}${turn.response.thinking.length > 200 ? "..." : ""}]` : ""; messages.push({ role: "assistant", content: `[Turn failed${errorContext}${toolCallContext}${thinkingContext}]`, }); } else if (turn.response?.message) { messages.push({ role: "assistant", content: turn.response.message }); } } messages.push({ role: "user", content: currentPrompt }); const modeFilteredTools = getToolsByMode(mode); let availableTools = session.type === ChatSessionType.Extension ? modeFilteredTools : getToolsExcludingCategory("browser").filter((t) => modeFilteredTools.includes(t), ); // Filter tools based on user configuration const userId = (session.user as IUser)._id.toString(); const userConfiguredTools: DtpTool[] = []; for (const tool of availableTools) { if (tool.requiresUserConfig()) { const isConfigured = await tool.checkUserConfig(userId); if (isConfigured) { userConfiguredTools.push(tool); } else { this.log.debug( `Tool ${tool.slug} excluded: user configuration required but not found`, { userId, toolSlug: tool.slug, }, ); } } else { userConfiguredTools.push(tool); } } availableTools = userConfiguredTools; const tools = availableTools.map((t: any) => t.definition); this.log.debug("tools available to agent this turn", { toolCount: tools.length, toolNames: tools.map((t) => t.function.name), }); return { messages, tools }; } private async buildAgentSystemPrompt(session: IChatSession): Promise { const user = session.user as IUser; const mode = session.mode; const promptFilePath = path.join( env.installRoot, "data", "prompts", "agent", mode, "system.md", ); let prompt = await fs.readFile(promptFilePath, "utf-8"); prompt += "\n\n## SESSION INFORMATION\n\n- Session ID: " + session._id + "\n- Session Type: " + session.type + "\n- Created At: " + session.createdAt.toISOString(); prompt += "\n\n## USER INFORMATION\n\n- User ID: " + user._id + "\n- Username: " + user.username + "\n- Display Name: " + user.displayName; prompt += "\n\n## SYSTEM INFORMATION\n\n- Current Time: " + new Date().toISOString() + "\n- Gadget Version: " + env.pkg.version + "\n- Timezone: " + env.timezone; try { const agentConfig = await fs.readFile("AGENTS.md", "utf-8"); this.log.debug("integrating AGENTS.md into system prompt"); prompt += "\n\n## AGENT CONFIGURATION\n\n" + agentConfig; } catch (error) { this.log.debug("AGENTS.md file not found or failed to load.", { error: error instanceof Error ? error.message : String(error), }); } prompt += "\n\n## PINBOARD\n\n"; if (session.pins.length > 0) { for (const pin of session.pins) { prompt += "- [" + pin._id?.toString() + "] " + pin.content + "\n"; } } else { prompt += "The pinboard is empty. Use the `pin_add` tool to add notes.\n"; } return this.renderSystemPromptTemplate(prompt, mode); } async renderSystemPromptTemplate( prompt: string, _mode: string, ): Promise { let content; /* * subagents.md */ content = await fs.readFile( path.join(env.installRoot, "data", "prompts", "common", "subagents.md"), "utf-8", ); prompt = prompt.replace("{{subagent_section}}", content.trim()); /* * scope-block.md */ content = await fs.readFile( path.join(env.installRoot, "data", "prompts", "common", "scope-block.md"), "utf-8", ); prompt = prompt.replace("{{scope_block}}", content.trim()); return prompt; } private async streamChat( session: IChatSession, messages: ChatMessage[], tools: ToolDefinition[], llm: string | undefined | null, userId: string, historyId: string, ): Promise<{ response: string; thinking: string; collectedToolCalls: Array<{ name: string; callId: string; parameters: Array<{ name: string; value: string }>; response: string; fileOperation?: unknown; subagentStats?: unknown; }>; subagentHistoryIds: string[]; inputTokens: number; outputTokens: number; }> { let fullResponse = ""; let fullThinking = ""; const collectedToolCalls: Array<{ name: string; callId: string; parameters: Array<{ name: string; value: string }>; response: string; fileOperation?: unknown; subagentStats?: unknown; }> = []; const subagentHistoryIds: string[] = []; let toolCallCount = 0; let inputTokens = 0; let outputTokens = 0; // Estimate input tokens from message content (rough estimate: 4 chars per token) const inputText = messages.map((m) => m.content).join(" "); inputTokens = Math.ceil(inputText.length / 4); if (!llm) { throw new Error("No LLM model configured for this agent"); } const client = await AiService.getAgentClient(userId); client.on("finish_reason", (reason) => { this.emit("finish_reason", { reason }); }); let chatOptions; try { const providerInfo = await AiService.getAgentProviderInfo(userId); if (providerInfo) { chatOptions = await AiService.getModelChatOptions( userId, providerInfo.providerId, providerInfo.model, ); } } catch (err) { this.log.warn("Failed to get model settings, using defaults", { error: err, }); } let currentMessages: ChatMessage[] = [...messages]; let continueLoop = true; let lastFinishReason: string | undefined; let iterations = 0; // Create abort controller for this operation const abortController = new AbortController(); this.setAbortController(abortController); while (continueLoop) { // Check for abort signal at the start of each iteration if (abortController.signal.aborted) { this.log.info("Agent operation was aborted"); this.clearAbortController(); throw new Error("Operation aborted"); } iterations++; continueLoop = false; // Check for queued user messages at the START of each iteration const pendingMessagesAtStart = this.getPendingUserMessages(); if (pendingMessagesAtStart.length > 0) { this.log.info("Injecting queued user messages at start of iteration", { count: pendingMessagesAtStart.length, iteration: iterations, }); for (const msg of pendingMessagesAtStart) { currentMessages.push({ role: "user", content: msg.content, }); } // Continue loop to process the injected messages continueLoop = true; } let iterationThinking = ""; let iterationResponse = ""; const toolCallMap = new Map(); this.log.debug("Starting AI chat request", { iteration: iterations, messageCount: currentMessages.length, hasTools: tools.length > 0, model: llm, }); let stream: AsyncGenerator; try { stream = await client.streamChat( currentMessages, tools, llm, chatOptions, ); } catch (chatError) { this.log.error("Failed to create chat stream", { error: chatError instanceof Error ? chatError.message : String(chatError), model: llm, }); throw chatError; } for await (const chunk of stream) { // Check for abort signal during streaming if (abortController.signal.aborted) { this.log.info("Agent operation was aborted during streaming"); this.clearAbortController(); throw new Error("Operation aborted"); } if (chunk.thinking) { iterationThinking += chunk.thinking; fullThinking += chunk.thinking; this.emit("thinking", { text: chunk.thinking }); } if (chunk.content) { iterationResponse += chunk.content; fullResponse += chunk.content; outputTokens += Math.ceil(chunk.content.length / 4); this.emit("message", { text: chunk.content }); } if (chunk.done && chunk.finish_reason) { lastFinishReason = chunk.finish_reason; } if (chunk.toolCall) { const tcDelta = chunk.toolCall; const index = tcDelta.index ?? 0; let existing = toolCallMap.get(index); if (!existing) { existing = { id: tcDelta.id ?? "call-" + Math.random().toString(36).slice(2, 9), type: "function", function: { name: tcDelta.function?.name ?? "", arguments: "", }, }; toolCallMap.set(index, existing); if (existing.function.name) { this.emit("tool_call", { name: existing.function.name, arguments: "", }); } } else { if (tcDelta.id) existing.id = tcDelta.id; if (tcDelta.function?.name) { const oldName = existing.function.name; existing.function.name = tcDelta.function.name; if (!oldName && existing.function.name) { this.emit("tool_call", { name: existing.function.name, arguments: "", }); } } } if (tcDelta.function?.arguments !== undefined) { const deltaArgs = typeof tcDelta.function.arguments === "string" ? tcDelta.function.arguments : JSON.stringify(tcDelta.function.arguments); const currentArgs = existing.function.arguments; const trimmedDelta = deltaArgs.trim(); const isFullObject = trimmedDelta.startsWith("{") && trimmedDelta.endsWith("}"); if ( isFullObject && (currentArgs.length === 0 || currentArgs === deltaArgs) ) { existing.function.arguments = deltaArgs; } else { existing.function.arguments += deltaArgs; } } } } const finalToolCalls = Array.from(toolCallMap.values()); currentMessages.push({ role: "assistant", content: iterationResponse, tool_calls: finalToolCalls.length > 0 ? finalToolCalls : undefined, }); // Incrementally update ChatHistory with thinking and response from this iteration if (iterationThinking || iterationResponse) { await ChatHistory.findByIdAndUpdate(historyId, { "response.thinking": iterationThinking, "response.message": iterationResponse, }); } if (finalToolCalls.length > 0) { continueLoop = true; for (const toolCall of finalToolCalls) { const toolName = toolCall.function.name; const toolArgsRaw = toolCall.function.arguments; let result = await this.executeTool(toolName, toolArgsRaw, session); toolCallCount++; let toolArgs: any = {}; try { toolArgs = JSON.parse(toolArgsRaw); } catch { // ignore } let parsedResult: any = null; try { parsedResult = JSON.parse(result); } catch { // ignore } this.log.debug("Tool result for LLM", { toolName, resultLength: result.length, preview: result.length > 100 ? result.substring(0, 100) + "..." : result, }); if (parsedResult?.success && parsedResult.data?.imageBase64) { try { const processed = await processImageForLlm( parsedResult.data.imageBase64, ); currentMessages.push({ role: "tool", content: `Screenshot captured (${processed.metadata.width}x${processed.metadata.height})`, images: [processed.base64], tool_call_id: toolCall.id, }); } catch (imgErr) { currentMessages.push({ role: "tool", content: result, tool_call_id: toolCall.id, }); } } else { currentMessages.push({ role: "tool", content: result, tool_call_id: toolCall.id, }); } const parameters: Array<{ name: string; value: string }> = []; for (const [k, v] of Object.entries(toolArgs)) { const value = typeof v === "string" ? v : JSON.stringify(v); // Skip parameters with empty string values if (value !== "") { parameters.push({ name: k, value }); } } if (parsedResult?.data?.historyIds) { subagentHistoryIds.push(...parsedResult.data.historyIds); } collectedToolCalls.push({ name: toolName, callId: toolCall.id, parameters, response: result, fileOperation: parsedResult?.data?.fileOperation, subagentStats: parsedResult?.data?.subagentStats, }); this.emit("tool_result", { tool: toolName, result: result, fileOperation: parsedResult?.data?.fileOperation, subagentStats: parsedResult?.data?.subagentStats, subagentPrompt: parsedResult?.data?.subagentPrompt, subagentType: parsedResult?.data?.subagentType, }); // Incrementally update ChatHistory with tool calls from this iteration await ChatHistory.findByIdAndUpdate(historyId, { $push: { toolCalls: { tool: { name: toolName, callId: toolCall.id, parameters, }, response: result, fileOperation: parsedResult?.data?.fileOperation ?? undefined, subagentStats: parsedResult?.data?.subagentStats ?? undefined, }, }, $inc: { inputTokens: Math.ceil(toolArgsRaw.length / 4), outputTokens: Math.ceil(result.length / 4), }, }); } } else { this.log.info( "Agent loop terminating: no tool calls found in this iteration.", ); } // Check for queued user messages and inject them before next LLM call const newPendingMessages = this.getPendingUserMessages(); if (newPendingMessages.length > 0) { this.log.info("Injecting queued user messages", { count: newPendingMessages.length, }); for (const msg of newPendingMessages) { currentMessages.push({ role: "user", content: msg.content, }); // Emit event so UI can display the message this.emit("queued_message_sent", { displayName: msg.displayName, content: msg.content, }); } // Continue loop to process the injected messages continueLoop = true; } if (lastFinishReason === "length") { this.log.info("Agent loop continuing: finish_reason was 'length'."); currentMessages.push({ role: "user", content: "Please continue.", }); continueLoop = true; lastFinishReason = undefined; } } await ChatSession.findByIdAndUpdate(session._id, { $inc: { "stats.toolCallCount": toolCallCount }, }); this.emit("done", { fullResponse, fullThinking, inputTokens, outputTokens, }); this.log.info("Agent chat stream finished.", { iterations, fullResponseLength: fullResponse.length, toolCallCount, inputTokens, outputTokens, }); // Clear abort controller on successful completion this.clearAbortController(); // Clear any remaining queued messages on successful completion this.clearPendingUserMessages(); return { response: fullResponse, thinking: fullThinking, collectedToolCalls, subagentHistoryIds, inputTokens, outputTokens, }; } private async executeTool( toolName: string, args: string, session: IChatSession, ): Promise { const tool = getToolByName(toolName); if (!tool) { const error = "Unknown tool: " + toolName; this.log.error("Tool not found", { toolName }); return JSON.stringify({ success: false, error, message: error }); } try { this.log.debug("Tool execution started", { toolName, args }); const parsedArgs = JSON.parse(args); const context = { session }; const result = await tool.execute(context, parsedArgs); this.log.debug("Tool execution completed", { toolName, resultLength: result.length, }); return result; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); this.log.error("Tool execution failed: " + toolName, { tool: toolName, arguments: args, error: errorMessage, stack: error instanceof Error ? error.stack : undefined, }); try { this.emit("tool_error", { tool: toolName, message: "Tool execution failed: " + errorMessage, }); } catch (emitError) { this.log.error("Failed to emit tool error event", { toolName, emitError, }); } return JSON.stringify({ success: false, error: "TOOL_EXECUTION_FAILED", message: errorMessage, }); } } private async saveChatHistory( session: IChatSession, prompt: string, thinking: string | undefined, message: string | undefined, error?: { message: string; stack?: string }, toolCalls?: Array<{ name: string; callId: string; parameters: Array<{ name: string; value: string }>; response: string; fileOperation?: unknown; subagentStats?: unknown; }>, subagentHistoryIds?: string[], isSubagent = false, inputTokens = 0, outputTokens = 0, ): Promise { const status = error ? "failed" : "success"; const user = session.user as IUser; const toolCallDocs = (toolCalls ?? []).map((tc) => ({ tool: { name: tc.name, callId: tc.callId, parameters: tc.parameters, }, response: tc.response, fileOperation: tc.fileOperation ?? undefined, subagentStats: tc.subagentStats ?? undefined, })); const fileOpDocs = toolCallDocs .filter((tc) => tc.fileOperation != null) .map((tc) => tc.fileOperation); const history = new ChatHistory({ user: user._id, session: session._id, prompt, mode: session.mode, toolCalls: toolCallDocs, fileOperations: fileOpDocs, response: { thinking, message }, status, isSubagent, subagentHistory: subagentHistoryIds || [], error: error ? { message: error.message, stack: error.stack, timestamp: new Date() } : undefined, inputTokens, outputTokens, }); const saved = await history.save(); return saved._id.toString(); } } export default new AgentService();