Re-build Agentic Workflow Loop

The ridiculousness of trying to maintain the previous agent's work got
out of hand, so we had this one re-build it - and got a better result.
This commit is contained in:
Rob Colbert 2026-05-09 21:04:18 -04:00
parent cf06163a03
commit 73c5345879
12 changed files with 654 additions and 968 deletions

View File

@ -243,7 +243,9 @@ export default function ChatSessionView() {
for (const updateBlock of turnUpdates.blocks) { for (const updateBlock of turnUpdates.blocks) {
let blockIndex = state?.currentBlockIndex ?? null; let blockIndex = state?.currentBlockIndex ?? null;
if ( if (updateBlock.mode === 'tool') {
blockIndex = null;
} else if (
blockIndex === null || blockIndex === null ||
updatedBlocks[blockIndex]?.mode !== updateBlock.mode updatedBlocks[blockIndex]?.mode !== updateBlock.mode
) { ) {

View File

@ -185,7 +185,9 @@ export class CodeSession extends SocketSession {
try { try {
const droneSession = SocketService.getDroneSession(this.selectedDrone); const droneSession = SocketService.getDroneSession(this.selectedDrone);
const latestSession = await ChatSessionService.getById(this.chatSession._id); const latestSession = await ChatSessionService.getById(
this.chatSession._id,
);
this.chatSession = latestSession; this.chatSession = latestSession;
let turn: ChatTurnDocument = await ChatSessionService.createTurn( let turn: ChatTurnDocument = await ChatSessionService.createTurn(
@ -243,6 +245,27 @@ export class CodeSession extends SocketSession {
turnId: turn._id, turnId: turn._id,
message, message,
}); });
/*
* Auto-generate a session name from the first prompt. Only do this when
* the name is still the default (user hasn't set a custom name) and
* we're on the first turn (turnCount === 1 after the increment above).
*/
if (
this.chatSession &&
this.chatSession.name === "New Chat Session" &&
this.chatSession.stats.turnCount === 1
) {
this.chatSession =
await ChatSessionService.generateSessionNameFromPrompt(
this.chatSession,
content,
);
const update: Partial<IChatSession> = {
name: this.chatSession.name,
};
this.socket.emit("sessionUpdated", update);
}
} else { } else {
this.log.error("work order rejected by drone", { this.log.error("work order rejected by drone", {
turnId: turn._id, turnId: turn._id,
@ -254,25 +277,6 @@ export class CodeSession extends SocketSession {
} }
}, },
); );
/*
* Auto-generate a session name from the first prompt. Only do this when
* the name is still the default (user hasn't set a custom name) and
* we're on the first turn (turnCount === 1 after the increment above).
*/
if (
this.chatSession.name === "New Chat Session" &&
this.chatSession.stats.turnCount === 1
) {
this.chatSession =
await ChatSessionService.generateSessionNameFromPrompt(
this.chatSession,
content,
);
const update: Partial<IChatSession> = { name: this.chatSession.name };
this.log.debug("emitting sessionUpdated message", { update });
this.socket.emit("sessionUpdated", update);
}
} catch (error) { } catch (error) {
this.log.error("prompt rejected", { error }); this.log.error("prompt rejected", { error });
cb(false, {}); cb(false, {});

View File

@ -404,6 +404,19 @@ class ChatSessionService extends DtpService {
}); });
const api: AiApi = createAiApi(aiEnv, provider, this.log); const api: AiApi = createAiApi(aiEnv, provider, this.log);
const systemPrompt = [
"ROLE:",
"You are an assistant that creates titles for chat sessions by examining the first prompt submitted by the user.",
"",
"SCOPE:",
"You return just the title of the session, no explanation or justifications. You return one title, not multiple choices.",
"Select the best title for the chat session, and return that.",
"",
"CONSTRAINTS:",
"- Chat session titles shouldn't be longer than 60 characters.",
"- Chat session titles shouldn't contain vulgarity or degeneracy.",
].join("\n");
const response = await api.generate( const response = await api.generate(
{ {
provider, provider,
@ -416,9 +429,8 @@ class ChatSessionService extends DtpService {
}, },
}, },
{ {
systemPrompt: systemPrompt,
"You are an assistant that creates titles for chat sessions by examining the first prompt.", prompt: `Here is the first prompt submitted by the user:\n\n${prompt}`,
prompt: `The first prompt submitted by the user: \n\n${prompt}`,
}, },
); );

View File

@ -584,7 +584,7 @@ class GadgetDrone extends GadgetProcess {
"status", "status",
`failed to process work order: ${(error as Error).message}`, `failed to process work order: ${(error as Error).message}`,
); );
// Leave cache in place for recovery await WorkspaceService.removeWorkOrderCache();
} finally { } finally {
process.chdir(workspaceDir); process.chdir(workspaceDir);
this.isProcessingWorkOrder = false; this.isProcessingWorkOrder = false;

View File

@ -1,11 +1,17 @@
import { describe, expect, it } from "vitest"; import { beforeEach, describe, expect, it } from "vitest";
import { ChatSessionMode, ChatTurnStatus } from "@gadget/api"; import { ChatSessionMode, ChatTurnStatus } from "@gadget/api";
import { AgentService, type IAgentWorkOrder } from "./agent.ts"; import { AgentService, type IAgentWorkOrder } from "./agent.ts";
describe("AgentService", () => { describe("AgentService", () => {
let service: AgentService;
beforeEach(async () => {
service = new AgentService();
await service.start();
});
it("replays historical tool results as assistant-readable context, not raw tool-role messages", () => { it("replays historical tool results as assistant-readable context, not raw tool-role messages", () => {
const service = new AgentService();
const user = { const user = {
_id: "user-1", _id: "user-1",
email: "user@example.com", email: "user@example.com",
@ -110,4 +116,21 @@ describe("AgentService", () => {
expect(messages[2]?.content).toContain("Historical tool result: file_read"); expect(messages[2]?.content).toContain("Historical tool result: file_read");
expect(messages[2]?.content).toContain("PATH: index.html"); expect(messages[2]?.content).toContain("PATH: index.html");
}); });
it("does not expose mutating file tools in plan mode", () => {
const toolNames = service.getToolNamesForMode(ChatSessionMode.Plan);
expect(toolNames).toContain("file_read");
expect(toolNames).toContain("fetch_url");
expect(toolNames).toContain("search_google");
expect(toolNames).not.toContain("file_write");
expect(toolNames).not.toContain("file_edit");
});
it("exposes mutating file tools in build mode", () => {
const toolNames = service.getToolNamesForMode(ChatSessionMode.Build);
expect(toolNames).toContain("file_write");
expect(toolNames).toContain("file_edit");
});
}); });

View File

@ -3,12 +3,11 @@
// Licensed under the Apache License, Version 2.0 // Licensed under the Apache License, Version 2.0
import env from "../config/env.ts"; import env from "../config/env.ts";
import assert from "node:assert";
import { Socket } from "socket.io-client"; import { Socket } from "socket.io-client";
import { import {
IAiChatOptions, IAiChatOptions,
IAiStreamChunk, IAiResponseStreamFn,
type IContextChatMessage, type IContextChatMessage,
} from "@gadget/ai"; } from "@gadget/ai";
import { import {
@ -41,11 +40,6 @@ export interface IAgentWorkOrder {
context: IChatTurn[]; context: IChatTurn[];
} }
interface IAgentWorkflow {
chatOptions: IAiChatOptions;
context: IContextChatMessage[];
}
type DroneSocket = Socket<ServerToClientEvents, ClientToServerEvents>; type DroneSocket = Socket<ServerToClientEvents, ClientToServerEvents>;
const toolboxEnv: DroneToolboxEnvironment = { const toolboxEnv: DroneToolboxEnvironment = {
@ -71,25 +65,25 @@ class AgentService extends GadgetService {
} }
async start(): Promise<void> { async start(): Promise<void> {
const googleSearchTool = new GoogleSearchTool(this.toolbox); const readOnlyModes = [
this.toolbox.register(googleSearchTool, [
ChatSessionMode.Plan,
ChatSessionMode.Build,
ChatSessionMode.Test,
ChatSessionMode.Ship,
ChatSessionMode.Develop,
]);
const modes = [
ChatSessionMode.Plan, ChatSessionMode.Plan,
ChatSessionMode.Build, ChatSessionMode.Build,
ChatSessionMode.Test, ChatSessionMode.Test,
ChatSessionMode.Ship, ChatSessionMode.Ship,
ChatSessionMode.Develop, ChatSessionMode.Develop,
]; ];
this.toolbox.register(new FileReadTool(this.toolbox), modes); const writeModes = [
this.toolbox.register(new FileWriteTool(this.toolbox), modes); ChatSessionMode.Build,
this.toolbox.register(new FileEditTool(this.toolbox), modes); ChatSessionMode.Test,
this.toolbox.register(new FetchUrlTool(this.toolbox), modes); ChatSessionMode.Ship,
ChatSessionMode.Develop,
];
this.toolbox.register(new GoogleSearchTool(this.toolbox), readOnlyModes);
this.toolbox.register(new FileReadTool(this.toolbox), readOnlyModes);
this.toolbox.register(new FetchUrlTool(this.toolbox), readOnlyModes);
this.toolbox.register(new FileWriteTool(this.toolbox), writeModes);
this.toolbox.register(new FileEditTool(this.toolbox), writeModes);
this.log.info("started"); this.log.info("started");
} }
@ -103,132 +97,130 @@ class AgentService extends GadgetService {
socket: DroneSocket, socket: DroneSocket,
): Promise<void> { ): Promise<void> {
const { turn } = workOrder; const { turn } = workOrder;
const task: IAgentWorkflow = { let toolCallCount = 0;
chatOptions: {}, let inputTokens = 0;
context: [], let outputTokens = 0;
};
let streamedThinking = false;
let streamedResponse = false;
let streamedToolCall = false;
const onStreamChunk = async (chunk: IAiStreamChunk): Promise<void> => { // Build the full message array that grows with each iteration
// this.log.debug("stream chunk received", { chunk }); const messages: IContextChatMessage[] = [];
switch (chunk.type) { if (turn.prompts.system) {
case "thinking": messages.push({
streamedThinking = true; createdAt: turn.createdAt,
socket.emit("thinking", chunk.data); role: "system",
break; content: turn.prompts.system,
case "response": });
streamedResponse = true; }
socket.emit("response", chunk.data);
break; messages.push(...this.buildSessionContext(workOrder));
case "toolCall":
streamedToolCall = true; // Current turn's user prompt must be the last message before the AI call
socket.emit( messages.push({
"toolCall", createdAt: turn.createdAt,
chunk.toolCallId!, role: "user",
chunk.toolName!, content: turn.prompts.user,
chunk.params || "{}", });
chunk.data,
); const reasoningEffort = turn.reasoningEffort || "off";
break; const reasoning: boolean | "low" | "medium" | "high" =
} reasoningEffort === "off" ? false : reasoningEffort;
};
try { try {
this.updateToolboxWorkspace(turn); this.updateToolboxWorkspace(turn);
task.context = this.buildSessionContext(workOrder);
task.chatOptions = {
systemPrompt: turn.prompts.system,
context: task.context,
userPrompt: turn.prompts.user,
tools: this.getToolsForMode(turn.mode),
};
} catch (cause) { } catch (cause) {
socket.emit( socket.emit(
"workOrderComplete", "workOrderComplete",
turn._id, turn._id,
false, false,
`failed to build session context: ${(cause as Error).message}`, `failed to update workspace: ${(cause as Error).message}`,
); );
const error = new Error("failed to build session context", { cause }); throw new Error("failed to update workspace", { cause });
throw error;
} }
this.log.info("agent loop starting", {
turnId: turn._id,
messageCount: messages.length,
toolCount: this.toolbox.getToolNamesForMode(turn.mode).length,
});
try { try {
const reasoningEffort = turn.reasoningEffort || "off"; let continueLoop = true;
const reasoning: boolean | "low" | "medium" | "high" = while (continueLoop) {
reasoningEffort === "off" ? false : reasoningEffort; continueLoop = false;
const response = await AiService.chat( this.log.info("agent loop iteration", {
turn.provider, messagesCount: messages.length,
{ toolsAvailable: this.toolbox.getToolNamesForMode(turn.mode),
modelId: turn.llm, });
params: {
reasoning, const chatOptions: IAiChatOptions = {
temperature: 0.8, context: messages,
topP: 0.9, tools: this.getToolsForMode(turn.mode),
topK: 40, };
const response = await AiService.chat(
turn.provider,
{
modelId: turn.llm,
params: { reasoning, temperature: 0.8, topP: 0.9, topK: 40 },
}, },
}, chatOptions,
task.chatOptions, this.makeStreamHandler(socket),
onStreamChunk,
);
if (this.isEmptyAgentResponse(response)) {
throw new Error(
"AI provider returned an empty response: no thinking, response, tool calls, or tool results.",
); );
}
// Check for model loading failure if (response.doneReason === "load" && !response.response && !response.thinking) {
if ( throw new Error("Model failed to respond (still loading or error)");
response.doneReason === "load" && }
!response.response &&
!response.thinking &&
(!response.toolCalls || response.toolCalls.length === 0)
) {
throw new Error("Model failed to respond (still loading or error)");
}
// Providers return accumulated final content; only emit it here when it // Process tool calls if present
// was not already delivered through the stream callback. if (response.toolCalls && response.toolCalls.length > 0) {
if (response.thinking && !streamedThinking) { continueLoop = true;
socket.emit("thinking", response.thinking); toolCallCount += response.toolCalls.length;
}
if (response.response && !streamedResponse) { messages.push({
socket.emit("response", response.response); createdAt: turn.createdAt,
} role: "assistant",
content: response.response,
});
if (response.toolCalls && response.toolCalls.length > 0 && !streamedToolCall) { for (const toolCall of response.toolCalls) {
for (const toolCall of response.toolCalls) { const result = await this.executeTool(
socket.emit( toolCall.function.name,
"toolCall", toolCall.function.arguments,
toolCall.callId, );
toolCall.function.name,
toolCall.function.arguments, socket.emit(
response.toolCallResults?.find((r) => r.callId === toolCall.callId) "toolCall",
?.result || "", toolCall.callId,
); toolCall.function.name,
toolCall.function.arguments,
result,
);
messages.push({
createdAt: turn.createdAt,
role: "tool",
callId: toolCall.callId,
toolName: toolCall.function.name,
content: result,
});
inputTokens += Math.ceil(toolCall.function.arguments.length / 4);
outputTokens += Math.ceil(result.length / 4);
}
} }
} }
} catch (cause) {
socket.emit(
"workOrderComplete",
turn._id,
false,
`failed to process agentic workflow loop: ${(cause as Error).message}`,
);
const error = new Error("failed to process agentic workflow loop", {
cause,
});
throw error;
}
// Emit work order complete socket.emit("workOrderComplete", turn._id, true);
socket.emit("workOrderComplete", turn._id, true); } catch (cause) {
const msg = cause instanceof Error ? cause.message : String(cause);
this.log.error("agent loop failed, sending workOrderComplete(false)", {
turnId: turn._id,
error: msg,
});
socket.emit("workOrderComplete", turn._id, false, msg);
throw cause;
}
} }
buildSessionContext(workOrder: IAgentWorkOrder): IContextChatMessage[] { buildSessionContext(workOrder: IAgentWorkOrder): IContextChatMessage[] {
@ -319,37 +311,62 @@ class AgentService extends GadgetService {
return Array.from(this.toolbox.getModeSet(mode) || []); return Array.from(this.toolbox.getModeSet(mode) || []);
} }
getToolNamesForMode(mode: ChatSessionMode): string[] {
return this.toolbox.getToolNamesForMode(mode);
}
private formatHistoricalToolResult(toolCall: { private formatHistoricalToolResult(toolCall: {
name: string; name: string;
parameters?: string; parameters?: string;
response?: string; response?: string;
}): string { }): string {
const response = toolCall.response || ""; const response = toolCall.response || "";
const maxLength = 8000;
const trimmedResponse =
response.length > maxLength
? `${response.slice(0, maxLength)}\n\n[Tool result truncated from ${response.length} characters.]`
: response;
return [ return [
`Historical tool result: ${toolCall.name}`, `Historical tool result: ${toolCall.name}`,
`Parameters: ${toolCall.parameters || "{}"}`, `Parameters: ${toolCall.parameters || "{}"}`,
"---", "---",
trimmedResponse, response.length > 8000
? `${response.slice(0, 8000)}\n\n[Tool result truncated from ${response.length} characters.]`
: response,
].join("\n"); ].join("\n");
} }
private isEmptyAgentResponse(response: { private makeStreamHandler(socket: DroneSocket): IAiResponseStreamFn {
response?: string; return async (chunk) => {
thinking?: string; switch (chunk.type) {
toolCalls?: unknown[]; case "thinking":
toolCallResults?: unknown[]; socket.emit("thinking", chunk.data);
}): boolean { break;
return ( case "response":
!(response.response && response.response.trim()) && socket.emit("response", chunk.data);
!(response.thinking && response.thinking.trim()) && break;
!(response.toolCalls && response.toolCalls.length) && }
!(response.toolCallResults && response.toolCallResults.length) };
); }
private async executeTool(name: string, argsJson: string): Promise<string> {
const tool = this.toolbox.getTool(name);
if (!tool) {
const msg = `Unknown tool: ${name}`;
this.log.error("tool not found", { toolName: name });
return JSON.stringify({ success: false, error: msg });
}
try {
const args = JSON.parse(argsJson);
this.log.info("executing tool", { name, params: argsJson });
const result = await tool.execute(args, this.log);
this.log.info("tool result", {
name,
resultLength: result.length,
preview: result.length > 100 ? `${result.slice(0, 100)}...` : result,
});
return result;
} catch (error) {
const msg = error instanceof Error ? error.message : String(error);
this.log.error("tool execution failed", { toolName: name, args: argsJson, error: msg });
return JSON.stringify({ success: false, error: msg });
}
} }
private updateToolboxWorkspace(turn: IChatTurn): void { private updateToolboxWorkspace(turn: IChatTurn): void {

View File

@ -67,4 +67,8 @@ export class AiToolbox {
getModeSet(mode: string): ToolSet | undefined { getModeSet(mode: string): ToolSet | undefined {
return this.modeSets.get(mode); return this.modeSets.get(mode);
} }
getToolNamesForMode(mode: string): string[] {
return Array.from(this.getModeSet(mode) || []).map((tool) => tool.name);
}
} }

View File

@ -175,31 +175,6 @@ export abstract class AiApi {
streamCallback?: IAiResponseStreamFn, streamCallback?: IAiResponseStreamFn,
): Promise<IAiChatResponse>; ): Promise<IAiChatResponse>;
protected shouldContinueAfterNonToolResponse(response: string): boolean {
const normalized = response.trim().toLowerCase();
if (!normalized) {
return false;
}
const futureIntentPatterns = [
/\bi\s*(?:will|'ll|am going to)\b/,
/\blet me\b/,
/\bi need to\b/,
/\bi should\b/,
/\bi(?:'m| am) going to\b/,
/\bnext,? i\b/,
/\bi(?:'ll| will) inspect\b/,
/\bi(?:'ll| will) read\b/,
/\bi(?:'ll| will) open\b/,
/\bi(?:'ll| will) check\b/,
/\bi(?:'ll| will) update\b/,
/\bi(?:'ll| will) modify\b/,
/\bi(?:'ll| will) fix\b/,
];
return futureIntentPatterns.some((pattern) => pattern.test(normalized));
}
protected assertNonEmptyChatResponse(response: IAiChatResponse): void { protected assertNonEmptyChatResponse(response: IAiChatResponse): void {
const hasResponse = response.response.trim().length > 0; const hasResponse = response.response.trim().length > 0;
const hasThinking = !!response.thinking?.trim(); const hasThinking = !!response.thinking?.trim();
@ -213,101 +188,4 @@ export abstract class AiApi {
} }
} }
protected buildContinuationPrompt(): string {
return [
"You stopped after describing future work instead of performing it.",
"Do not explain what you are about to do.",
"Call the appropriate tool now, or provide a final answer only if no tool use is needed.",
].join(" ");
}
protected shouldContinueForUserWorkRequest(
userPrompt: string | undefined,
response: string,
hasExecutedToolsThisTurn: boolean,
): boolean {
if (hasExecutedToolsThisTurn) {
return false;
}
const prompt = (userPrompt || "").toLowerCase();
const answer = response.trim().toLowerCase();
if (!prompt || !answer) {
return false;
}
const workRequestPatterns = [
/\bread\b/,
/\bfix\b/,
/\bchange\b/,
/\bupdate\b/,
/\bmodify\b/,
/\bedit\b/,
/\bwrite\b/,
/\bcreate\b/,
/\bimplement\b/,
/\bdebug\b/,
/\binspect\b/,
/\bopen\b/,
];
const directAnswerPatterns = [
/\bcompleted\b/,
/\bfixed\b/,
/\bupdated\b/,
/\bchanged\b/,
/\bimplemented\b/,
/\bno tool use (?:is|was) needed\b/,
/\bdoes not require tool use\b/,
];
return (
workRequestPatterns.some((pattern) => pattern.test(prompt)) &&
!directAnswerPatterns.some((pattern) => pattern.test(answer))
);
}
protected async executeToolCalls(
toolCalls: IToolCall[],
tools: IAiTool[],
): Promise<IToolCallResult[]> {
const results: IToolCallResult[] = [];
for (const toolCall of toolCalls) {
const tool = tools.find((t) => t.name === toolCall.function.name);
if (!tool) {
this.log.warn(`tool not found: ${toolCall.function.name}`);
results.push({
callId: toolCall.callId,
functionName: toolCall.function.name,
result: "",
error: `Tool '${toolCall.function.name}' not found`,
});
continue;
}
try {
const args = JSON.parse(toolCall.function.arguments);
const result = await tool.execute(args, this.log);
results.push({
callId: toolCall.callId,
functionName: toolCall.function.name,
result,
});
} catch (error) {
const errorMessage = (error as Error).message;
this.log.error(`tool execution failed: ${toolCall.function.name}`, {
error: errorMessage,
});
results.push({
callId: toolCall.callId,
functionName: toolCall.function.name,
result: "",
error: errorMessage,
});
}
}
return results;
}
} }

View File

@ -118,61 +118,33 @@ describe('OllamaAiApi', () => {
}); });
it('should handle tool calls', async () => { it('should handle tool calls', async () => {
// Mock streaming response with tool call const mockStream = async function* () {
let callCount = 0; yield {
mockOllamaClient.chat.mockImplementation(() => { message: {
callCount++; content: '',
return (async function* () { tool_calls: [
if (callCount === 1) { {
yield { function: {
message: { name: 'search_google',
content: '', arguments: { query: 'test query' },
tool_calls: [ },
{
function: {
name: 'search_google',
arguments: { query: 'test query' },
},
},
],
}, },
done: false, ],
};
yield {
message: { content: '' },
done: true,
done_reason: 'stop',
total_duration: 100,
prompt_eval_count: 10,
eval_count: 1,
};
} else {
yield {
message: { content: 'Done' },
done: true,
done_reason: 'stop',
total_duration: 100,
prompt_eval_count: 10,
eval_count: 1,
};
}
})();
});
const mockTool = {
name: 'search_google',
category: 'search',
definition: {
type: 'function',
function: {
name: 'search_google',
description: 'Search Google',
parameters: { type: 'object', properties: {} },
}, },
}, done: false,
execute: vi.fn().mockResolvedValue('search results'), };
yield {
message: { content: '' },
done: true,
done_reason: 'stop',
total_duration: 100,
prompt_eval_count: 10,
eval_count: 1,
};
}; };
mockOllamaClient.chat.mockResolvedValue(mockStream());
const streamCallback = vi.fn(); const streamCallback = vi.fn();
const response = await api.chat( const response = await api.chat(
{ {
@ -183,28 +155,20 @@ describe('OllamaAiApi', () => {
{ {
userPrompt: 'Test prompt', userPrompt: 'Test prompt',
context: [], context: [],
tools: [mockTool as any],
}, },
streamCallback, streamCallback,
); );
// Verify tool call was emitted via stream callback // Verify tool calls are returned, not executed
expect(streamCallback).toHaveBeenCalledWith(
expect.objectContaining({
type: 'toolCall',
toolName: 'search_google',
}),
);
// Verify tool was executed
expect(mockTool.execute).toHaveBeenCalled();
// Verify response indicates tool calls were processed
expect(response.toolCalls).toBeDefined(); expect(response.toolCalls).toBeDefined();
expect(response.toolCalls!.length).toBe(1);
expect(response.toolCalls![0].function.name).toBe('search_google');
// chat() should only be called once (no internal loop)
expect(mockOllamaClient.chat).toHaveBeenCalledTimes(1);
}); });
it('should handle thinking content when reasoning is enabled', async () => { it('should handle thinking content when reasoning is enabled', async () => {
// Mock streaming response with thinking
const mockStream = async function* () { const mockStream = async function* () {
yield { yield {
message: { message: {
@ -246,7 +210,6 @@ describe('OllamaAiApi', () => {
streamCallback, streamCallback,
); );
// Verify thinking was emitted
expect(streamCallback).toHaveBeenCalledWith({ expect(streamCallback).toHaveBeenCalledWith({
type: 'thinking', type: 'thinking',
data: 'Let me think about this...', data: 'Let me think about this...',
@ -255,19 +218,14 @@ describe('OllamaAiApi', () => {
type: 'thinking', type: 'thinking',
data: ' The answer is', data: ' The answer is',
}); });
// Verify response was emitted
expect(streamCallback).toHaveBeenCalledWith({ expect(streamCallback).toHaveBeenCalledWith({
type: 'response', type: 'response',
data: '42', data: '42',
}); });
// Verify final response includes thinking
expect(response.thinking).toBe('Let me think about this... The answer is'); expect(response.thinking).toBe('Let me think about this... The answer is');
}); });
it('should reject empty response on load failure', async () => { it('should reject empty response on load failure', async () => {
// Mock streaming response with load failure
const mockStream = async function* () { const mockStream = async function* () {
yield { yield {
message: { content: '' }, message: { content: '' },
@ -294,91 +252,6 @@ describe('OllamaAiApi', () => {
vi.fn(), vi.fn(),
)).rejects.toThrow('Provider returned an empty chat response'); )).rejects.toThrow('Provider returned an empty chat response');
}); });
it('should iterate tool calling loop when tools are present', async () => {
let callCount = 0;
// Mock streaming response that requires tool call then returns
const mockStream = async function* () {
callCount++;
if (callCount === 1) {
// First call: return tool call
yield {
message: {
content: '',
tool_calls: [
{
function: {
name: 'search_google',
arguments: { query: 'test' },
},
},
],
},
done: false,
};
yield {
message: { content: '' },
done: true,
done_reason: 'stop',
total_duration: 100,
prompt_eval_count: 10,
eval_count: 1,
};
} else {
// Second call: return final response
yield {
message: { content: 'Here are the results' },
done: true,
done_reason: 'stop',
total_duration: 100,
prompt_eval_count: 15,
eval_count: 5,
};
}
};
mockOllamaClient.chat.mockImplementation(() => mockStream());
const mockTool = {
name: 'search_google',
category: 'search',
definition: {
type: 'function',
function: {
name: 'search_google',
description: 'Search Google',
parameters: { type: 'object', properties: {} },
},
},
execute: vi.fn().mockResolvedValue('search results'),
};
const streamCallback = vi.fn();
const response = await api.chat(
{
provider: mockProvider as any,
modelId: 'test-model',
params: { reasoning: false, temperature: 0.8, topP: 0.9, topK: 40 },
},
{
userPrompt: 'Test prompt',
context: [],
tools: [mockTool as any],
},
streamCallback,
);
// Verify chat was called twice (once for tool call, once for response)
expect(mockOllamaClient.chat).toHaveBeenCalledTimes(2);
// Verify tool was executed
expect(mockTool.execute).toHaveBeenCalled();
// Verify final response
expect(response.done).toBe(true);
expect(response.response).toBe('Here are the results');
});
}); });
describe('probeModel', () => { describe('probeModel', () => {

View File

@ -11,7 +11,6 @@ import {
IAiChatOptions, IAiChatOptions,
IAiChatResponse, IAiChatResponse,
IToolCall, IToolCall,
IToolCallResult,
IAiGenerateOptions, IAiGenerateOptions,
IAiGenerateResponse, IAiGenerateResponse,
IAiLogger, IAiLogger,
@ -208,23 +207,12 @@ export class OllamaAiApi extends AiApi {
modelId: model.modelId, modelId: model.modelId,
}); });
// VALIDATE: Ensure we have at least one message with content
if (!options.userPrompt || !options.userPrompt.trim()) {
throw new Error("userPrompt is required and cannot be empty");
}
// Build messages array like OpenAI does
const messages: OllamaMessage[] = []; const messages: OllamaMessage[] = [];
// Add system prompt if present
if (options.systemPrompt) { if (options.systemPrompt) {
messages.push({ messages.push({ role: "system", content: options.systemPrompt });
role: "system",
content: options.systemPrompt,
});
} }
// Add context messages
if (options.context) { if (options.context) {
for (const msg of options.context) { for (const msg of options.context) {
if (msg.content && msg.content.trim()) { if (msg.content && msg.content.trim()) {
@ -244,217 +232,97 @@ export class OllamaAiApi extends AiApi {
} }
} }
// Add user prompt (required) if (options.userPrompt) {
messages.push({ messages.push({ role: "user", content: options.userPrompt });
role: "user", }
content: options.userPrompt,
});
// VALIDATE: Ensure messages array is not empty before calling API
if (messages.length === 0) { if (messages.length === 0) {
throw new Error( throw new Error(
"Messages array is empty - cannot call Ollama API with no messages", "Messages array is empty - cannot call Ollama API with no messages",
); );
} }
// DEBUG: Log what we're sending to Ollama const ollamaTools = options.tools
await this.log.debug("Ollama chat request", { ? options.tools.map((tool) => ({
messagesCount: messages.length, type: tool.definition.type,
messages: messages.map((m) => ({ function: {
role: m.role, name: tool.definition.function.name,
contentLength: m.content?.length || 0, description: tool.definition.function.description,
})), parameters: tool.definition.function.parameters,
userPrompt: options.userPrompt?.slice(0, 100), },
contextCount: options.context?.length || 0, }))
: undefined;
const response = await this.client.chat({
model: model.modelId,
messages,
stream: true,
think: model.params.reasoning,
tools: ollamaTools,
}); });
const allToolCallResults: IToolCallResult[] = []; let lastChunk;
const allToolCalls: IToolCall[] = []; let accumulatedThinking = "";
let totalAccumulatedResponse = ""; let accumulatedResponse = "";
let totalAccumulatedThinking = ""; const toolCalls: IToolCall[] = [];
/* for await (const chunk of response) {
* Our agents do not have iteration count limits. We have seen an agent lastChunk = chunk;
* issue 100+ legitimate calls in a single turn.
*/
while (true) {
const ollamaTools = options.tools
? options.tools.map((tool) => ({
type: tool.definition.type,
function: {
name: tool.definition.function.name,
description: tool.definition.function.description,
parameters: tool.definition.function.parameters,
},
}))
: undefined;
const response = await this.client.chat({ if (chunk.message.thinking) {
model: model.modelId, accumulatedThinking += chunk.message.thinking;
messages, if (streamCallback) {
stream: true,
think: model.params.reasoning,
tools: ollamaTools,
});
let lastChunk;
let accumulatedThinking = "";
let accumulatedResponse = "";
const streamedToolCalls: Array<{
callId: string;
function: { name: string; arguments: any };
}> = [];
for await (const chunk of response) {
lastChunk = chunk;
if (chunk.message.thinking) {
accumulatedThinking += chunk.message.thinking;
if (streamCallback) {
await streamCallback({
type: "thinking",
data: chunk.message.thinking,
});
}
}
if (chunk.message.content) {
accumulatedResponse += chunk.message.content;
if (streamCallback) {
await streamCallback({
type: "response",
data: chunk.message.content,
});
}
}
if (chunk.message.tool_calls) {
for (const [index, tc] of chunk.message.tool_calls.entries()) {
const params = JSON.stringify(tc.function.arguments);
const callId = `tool_${tc.function.name}_${Date.now()}_${index}`;
const toolCall: IToolCall = {
callId,
function: {
name: tc.function.name,
arguments: params,
},
};
streamedToolCalls.push(toolCall);
allToolCalls.push(toolCall);
}
}
}
assert(lastChunk, "no response chunks received");
// Use accumulated thinking/response for final response
const finalThinking = accumulatedThinking || lastChunk.message.thinking;
const finalResponse = accumulatedResponse || lastChunk.message.content;
// Accumulate across iterations
totalAccumulatedResponse += finalResponse || "";
totalAccumulatedThinking += finalThinking || "";
// Use accumulated tool calls from stream
const toolCalls = streamedToolCalls;
if (!toolCalls || toolCalls.length === 0) {
if (
options.tools?.length &&
(this.shouldContinueAfterNonToolResponse(finalResponse || "") ||
this.shouldContinueForUserWorkRequest(
options.userPrompt,
finalResponse || "",
allToolCallResults.length > 0,
))
) {
await this.log.warn("model produced future-intent text without tool calls; continuing AWL", {
responseLength: (finalResponse || "").length,
});
messages.push({
role: "assistant",
content: finalResponse || "",
});
messages.push({
role: "user",
content: this.buildContinuationPrompt(),
});
continue;
}
const chatResponse: IAiChatResponse = {
done: lastChunk.done,
doneReason: lastChunk.done_reason,
response: totalAccumulatedResponse,
thinking: totalAccumulatedThinking,
toolCalls: allToolCalls.length > 0 ? allToolCalls : undefined,
toolCallResults:
allToolCallResults.length > 0 ? allToolCallResults : undefined,
stats: {
duration: {
seconds: lastChunk.total_duration,
text: numeral(lastChunk.total_duration).format("hh:mm:ss"),
},
tokenCounts: {
input: lastChunk.prompt_eval_count,
response: lastChunk.eval_count,
thinking: 0,
},
},
};
this.assertNonEmptyChatResponse(chatResponse);
return chatResponse;
}
const toolCallResults = await this.executeToolCalls(
toolCalls,
options.tools || [],
);
allToolCallResults.push(...toolCallResults);
if (streamCallback) {
for (const result of toolCallResults) {
const toolCall = toolCalls.find((tc) => tc.callId === result.callId);
await streamCallback({ await streamCallback({
type: "toolCall", type: "thinking",
data: result.error || result.result, data: chunk.message.thinking,
toolCallId: result.callId,
toolName: result.functionName,
params: toolCall?.function.arguments || "{}",
}); });
} }
} }
if (chunk.message.content) {
const assistantMsg: OllamaMessage = { accumulatedResponse += chunk.message.content;
role: "assistant", if (streamCallback) {
content: accumulatedResponse || lastChunk.message.content, await streamCallback({
}; type: "response",
if (lastChunk.message.thinking) { data: chunk.message.content,
assistantMsg.thinking = lastChunk.message.thinking; });
}
} }
if (lastChunk.message.tool_calls) { if (chunk.message.tool_calls) {
assistantMsg.tool_calls = lastChunk.message.tool_calls; for (const [index, tc] of chunk.message.tool_calls.entries()) {
} const params = JSON.stringify(tc.function.arguments);
messages.push(assistantMsg); const callId = `tool_${tc.function.name}_${Date.now()}_${index}`;
for (const result of toolCallResults) { toolCalls.push({
const toolContent = result.error callId,
? `Error executing ${result.functionName}: ${result.error}` function: {
: result.result; name: tc.function.name,
arguments: params,
const toolMsg = { },
role: "tool" as const, });
content: toolContent, }
};
messages.push(toolMsg);
}
// VALIDATE: Ensure tool results are in messages
const toolMessages = messages.filter((m) => m.role === "tool");
if (toolMessages.length === 0 && toolCallResults.length > 0) {
await this.log.error("CRITICAL: tool results NOT in messages array", {
toolCallResultsCount: toolCallResults.length,
messagesCount: messages.length,
});
} }
} }
assert(lastChunk, "no response chunks received");
const chatResponse: IAiChatResponse = {
done: lastChunk.done,
doneReason: lastChunk.done_reason,
response: accumulatedResponse || lastChunk.message.content,
thinking: accumulatedThinking || lastChunk.message.thinking,
toolCalls: toolCalls.length > 0 ? toolCalls : undefined,
stats: {
duration: {
seconds: lastChunk.total_duration,
text: numeral(lastChunk.total_duration).format("hh:mm:ss"),
},
tokenCounts: {
input: lastChunk.prompt_eval_count,
response: lastChunk.eval_count,
thinking: 0,
},
},
};
this.assertNonEmptyChatResponse(chatResponse);
return chatResponse;
} }
} }

View File

@ -72,73 +72,51 @@ describe("OpenAiApi", () => {
)).rejects.toThrow("Provider returned an empty chat response"); )).rejects.toThrow("Provider returned an empty chat response");
}); });
it("assembles streamed tool-call argument fragments before executing", async () => { it("assembles streamed tool-call argument fragments and returns them", async () => {
mockCreate mockCreate.mockResolvedValueOnce(streamChunks([
.mockResolvedValueOnce(streamChunks([ {
{ choices: [{
choices: [{ delta: {
delta: { tool_calls: [{
tool_calls: [{ index: 0,
index: 0, id: "call_1",
id: "call_1", type: "function",
type: "function", function: { name: "file_read", arguments: '{"path"' },
function: { name: "file_read", arguments: '{"path"' }, }],
}], },
}, finish_reason: null,
finish_reason: null, }],
}],
},
{
choices: [{
delta: {
tool_calls: [{
index: 0,
function: { arguments: ':"index.html"}' },
}],
},
finish_reason: "tool_calls",
}],
},
]))
.mockResolvedValueOnce(streamChunks([
{ choices: [{ delta: { content: "Done" }, finish_reason: "stop" }] },
]));
const tool = {
name: "file_read",
category: "file",
definition: {
type: "function" as const,
function: {
name: "file_read",
description: "Read file",
parameters: { type: "object", properties: {} },
},
}, },
execute: vi.fn().mockResolvedValue("PATH: index.html\n---\ncontent"), {
}; choices: [{
const streamCallback = vi.fn(); delta: {
tool_calls: [{
index: 0,
function: { arguments: ':"index.html"}' },
}],
},
finish_reason: "tool_calls",
}],
},
]));
const streamCallback = vi.fn();
const response = await api.chat( const response = await api.chat(
{ {
provider: mockProvider as any, provider: mockProvider as any,
modelId: "test-model", modelId: "test-model",
params: { reasoning: false, temperature: 0.8, topP: 0.9, topK: 40 }, params: { reasoning: false, temperature: 0.8, topP: 0.9, topK: 40 },
}, },
{ userPrompt: "Read index.html", context: [], tools: [tool] }, { userPrompt: "Read index.html", context: [], tools: [] },
streamCallback, streamCallback,
); );
expect(tool.execute).toHaveBeenCalledWith({ path: "index.html" }, mockLogger); // Tool calls are returned, not executed
expect(streamCallback).toHaveBeenCalledWith(expect.objectContaining({ expect(response.toolCalls).toBeDefined();
type: "toolCall", expect(response.toolCalls!.length).toBe(1);
toolCallId: "call_1", expect(response.toolCalls![0].function.name).toBe("file_read");
toolName: "file_read", expect(response.toolCalls![0].function.arguments).toBe('{"path":"index.html"}');
data: "PATH: index.html\n---\ncontent", expect(mockCreate).toHaveBeenCalledTimes(1);
params: '{"path":"index.html"}',
}));
expect(response.response).toBe("Done");
expect(mockCreate).toHaveBeenCalledTimes(2);
}); });
it("falls back to non-streaming response when stream has no deltas", async () => { it("falls back to non-streaming response when stream has no deltas", async () => {
@ -164,4 +142,40 @@ describe("OpenAiApi", () => {
expect(response.response).toBe("Fallback answer"); expect(response.response).toBe("Fallback answer");
expect(streamCallback).toHaveBeenCalledWith({ type: "response", data: "Fallback answer" }); expect(streamCallback).toHaveBeenCalledWith({ type: "response", data: "Fallback answer" });
}); });
it("returns conversational responses without forcing another iteration", async () => {
mockCreate.mockResolvedValueOnce(streamChunks([
{ choices: [{ delta: { content: "I can talk this through." }, finish_reason: "stop" }] },
]));
const streamCallback = vi.fn();
const response = await api.chat(
{
provider: mockProvider as any,
modelId: "test-model",
params: { reasoning: false, temperature: 0.8, topP: 0.9, topK: 40 },
},
{
userPrompt: "Don't edit code. Just talk to me.",
context: [],
tools: [{
name: "file_edit",
category: "file",
definition: {
type: "function" as const,
function: {
name: "file_edit",
description: "Edit file",
parameters: { type: "object", properties: {} },
},
},
execute: vi.fn(),
}],
},
streamCallback,
);
expect(response.response).toBe("I can talk this through.");
expect(mockCreate).toHaveBeenCalledTimes(1);
});
}); });

View File

@ -9,7 +9,6 @@ import {
IAiChatOptions, IAiChatOptions,
IAiChatResponse, IAiChatResponse,
IToolCall, IToolCall,
IToolCallResult,
IAiGenerateOptions, IAiGenerateOptions,
IAiGenerateResponse, IAiGenerateResponse,
IAiLogger, IAiLogger,
@ -20,11 +19,9 @@ import {
IAiResponseStreamFn, IAiResponseStreamFn,
} from "./api.js"; } from "./api.js";
import { import {
ChatCompletionAssistantMessageParam,
ChatCompletionFunctionTool, ChatCompletionFunctionTool,
ChatCompletionMessageParam, ChatCompletionMessageParam,
ChatCompletionTool, ChatCompletionTool,
ChatCompletionToolMessageParam,
} from "openai/resources"; } from "openai/resources";
import { IAiEnvironment } from "./config/env.ts"; import { IAiEnvironment } from "./config/env.ts";
@ -67,6 +64,24 @@ interface StreamingToolCallAccumulator {
}; };
} }
interface OpenAiChatIterationResult {
response: string;
thinking?: string;
toolCalls: IToolCall[];
assistantToolCalls: Array<{
id: string;
type: "function";
function: {
name: string;
arguments: string;
};
}>;
finishReason?: string | null;
chunkCount: number;
contentDeltaCount: number;
toolDeltaCount: number;
}
export class OpenAiApi extends AiApi { export class OpenAiApi extends AiApi {
protected client: OpenAI; protected client: OpenAI;
@ -256,277 +271,169 @@ export class OpenAiApi extends AiApi {
}); });
const startTime = Date.now(); const startTime = Date.now();
const messages = this.buildMessages(options);
const tools = this.buildTools(options);
let iteration = await this.readStreamingChatCompletion(
model,
messages,
tools,
streamCallback,
);
await this.log.debug("OpenAI chat stream iteration finished", {
chunkCount: iteration.chunkCount,
contentDeltaCount: iteration.contentDeltaCount,
toolDeltaCount: iteration.toolDeltaCount,
responseLength: iteration.response.length,
thinkingLength: iteration.thinking?.length || 0,
toolCallCount: iteration.toolCalls.length,
finishReason: iteration.finishReason,
});
if (this.isEmptyIteration(iteration)) {
iteration = await this.readNonStreamingChatCompletion(model, messages, tools);
if (streamCallback && iteration.response) {
await streamCallback({ type: "response", data: iteration.response });
}
await this.log.warn("OpenAI stream was empty; used non-streaming fallback", {
responseLength: iteration.response.length,
thinkingLength: iteration.thinking?.length || 0,
toolCallCount: iteration.toolCalls.length,
finishReason: iteration.finishReason,
});
}
if (this.isEmptyIteration(iteration)) {
this.assertNonEmptyChatResponse({
done: true,
response: "",
thinking: undefined,
toolCalls: undefined,
toolCallResults: undefined,
stats: this.buildStats(startTime),
});
}
const finalResponse: IAiChatResponse = {
done: true,
response: iteration.response,
thinking: iteration.thinking,
toolCalls: iteration.toolCalls.length > 0 ? iteration.toolCalls : undefined,
stats: this.buildStats(startTime),
};
this.assertNonEmptyChatResponse(finalResponse);
return finalResponse;
}
private buildMessages(options: IAiChatOptions): ChatCompletionMessageParam[] {
const messages: ChatCompletionMessageParam[] = []; const messages: ChatCompletionMessageParam[] = [];
if (options.systemPrompt) { if (options.systemPrompt) {
messages.push({ role: "system", content: options.systemPrompt }); messages.push({ role: "system", content: options.systemPrompt });
} }
if (options.context) { for (const msg of options.context || []) {
for (const msg of options.context) { if (!msg.content?.trim()) continue;
if (msg.role === "tool") { if (msg.role === "tool") {
messages.push({ messages.push({
role: "tool", role: "assistant",
content: msg.content, content: `Historical tool result${msg.toolName ? ` from ${msg.toolName}` : ""}:\n${msg.content}`,
tool_call_id: msg.callId || "", });
}); continue;
} else {
messages.push({
role: msg.role as "user" | "assistant" | "system",
content: msg.content,
});
}
} }
messages.push({
role: msg.role as "user" | "assistant" | "system",
content: msg.content,
});
} }
if (options.userPrompt) { if (options.userPrompt?.trim()) {
messages.push({ role: "user", content: options.userPrompt }); messages.push({ role: "user", content: options.userPrompt });
} }
return messages;
const allToolCallResults: IToolCallResult[] = [];
const allToolCalls: IToolCall[] = [];
while (true) {
const tools: ChatCompletionTool[] = options.tools
? options.tools.map((tool) => {
const openaiTool: ChatCompletionFunctionTool = {
type: tool.definition.type,
function: {
name: tool.definition.function.name,
description: tool.definition.function.description,
parameters: tool.definition.function.parameters,
},
};
return openaiTool;
})
: [];
const response = await this.client.chat.completions.create({
model: model.modelId,
messages,
tools,
stream: true,
...(typeof model.params.reasoning === "string"
? {
reasoning_effort: model.params.reasoning as
| "low"
| "medium"
| "high",
}
: {}),
});
let accumulatedResponse = "";
let accumulatedThinking = "";
let chunkCount = 0;
let contentDeltaCount = 0;
let toolDeltaCount = 0;
let finishReason: string | null | undefined;
const toolCallMap = new Map<number, StreamingToolCallAccumulator>();
let assistantToolCallsForMessage: Array<{
id: string;
type: "function";
function: { name: string; arguments: string };
}> = [];
for await (const chunk of response) {
chunkCount++;
finishReason = chunk.choices[0]?.finish_reason ?? finishReason;
const delta = chunk.choices[0]?.delta;
if (delta) {
if (delta.content) {
contentDeltaCount++;
accumulatedResponse += delta.content;
if (streamCallback) {
await streamCallback({
type: "response",
data: delta.content,
});
}
}
if ("reasoning" in delta && delta.reasoning) {
accumulatedThinking += delta.reasoning as string;
if (streamCallback) {
await streamCallback({
type: "thinking",
data: delta.reasoning as string,
});
}
}
if (delta.tool_calls) {
toolDeltaCount += delta.tool_calls.length;
for (const tc of delta.tool_calls) {
const index = tc.index;
let accumulated = toolCallMap.get(index);
if (!accumulated) {
accumulated = {
index,
id: tc.id || `tool_${Date.now()}_${index}`,
type: "function",
function: {
name: "",
arguments: "",
},
};
toolCallMap.set(index, accumulated);
}
if (tc.id) {
accumulated.id = tc.id;
}
if (tc.function?.name) {
accumulated.function.name += tc.function.name;
}
if (tc.function?.arguments) {
accumulated.function.arguments += tc.function.arguments;
}
}
}
}
}
const finalToolCalls = Array.from(toolCallMap.values())
.sort((a, b) => a.index - b.index)
.filter((tc) => tc.function.name);
const toolCalls = finalToolCalls.map((tc) => ({
callId: tc.id,
function: {
name: tc.function.name,
arguments: tc.function.arguments,
},
}));
assistantToolCallsForMessage = finalToolCalls.map((tc) => ({
id: tc.id,
type: "function" as const,
function: {
name: tc.function.name,
arguments: tc.function.arguments,
},
}));
allToolCalls.push(...toolCalls);
await this.log.debug("OpenAI chat stream iteration finished", {
chunkCount,
contentDeltaCount,
toolDeltaCount,
responseLength: accumulatedResponse.length,
thinkingLength: accumulatedThinking.length,
toolCallCount: toolCalls.length,
finishReason,
});
if (chunkCount > 0 && !accumulatedResponse && !accumulatedThinking && toolCalls.length === 0) {
const fallback = await this.chatOnceNonStreaming(model, messages, tools);
accumulatedResponse = fallback.response;
accumulatedThinking = fallback.thinking || "";
toolCalls.push(...fallback.toolCalls);
allToolCalls.push(...fallback.toolCalls);
assistantToolCallsForMessage = fallback.assistantToolCalls;
if (streamCallback && fallback.response) {
await streamCallback({ type: "response", data: fallback.response });
}
await this.log.warn("OpenAI stream was empty; used non-streaming fallback", {
responseLength: accumulatedResponse.length,
thinkingLength: accumulatedThinking.length,
toolCallCount: fallback.toolCalls.length,
finishReason,
});
}
if (!toolCalls || toolCalls.length === 0) {
if (
options.tools?.length &&
(this.shouldContinueAfterNonToolResponse(accumulatedResponse) ||
this.shouldContinueForUserWorkRequest(
options.userPrompt,
accumulatedResponse,
allToolCallResults.length > 0,
))
) {
await this.log.warn("model produced future-intent text without tool calls; continuing AWL", {
responseLength: accumulatedResponse.length,
});
messages.push({ role: "assistant", content: accumulatedResponse });
messages.push({ role: "user", content: this.buildContinuationPrompt() });
continue;
}
const finalResponse: IAiChatResponse = {
done: true,
response: accumulatedResponse,
thinking: accumulatedThinking || undefined,
toolCalls: allToolCalls.length > 0 ? allToolCalls : undefined,
toolCallResults:
allToolCallResults.length > 0 ? allToolCallResults : undefined,
stats: {
duration: {
seconds: (Date.now() - startTime) / 1000,
text: numeral((Date.now() - startTime) / 1000).format("hh:mm:ss"),
},
tokenCounts: {
input: 0,
response: 0,
thinking: 0,
},
},
};
this.assertNonEmptyChatResponse(finalResponse);
return finalResponse;
}
const toolCallResults = await this.executeToolCalls(
toolCalls,
options.tools || [],
);
allToolCallResults.push(...toolCallResults);
if (streamCallback) {
for (const result of toolCallResults) {
const toolCall = toolCalls.find((tc) => tc.callId === result.callId);
await streamCallback({
type: "toolCall",
data: result.error || result.result,
toolCallId: result.callId,
toolName: result.functionName,
params: toolCall?.function.arguments || "{}",
});
}
}
const assistantMsg: ChatCompletionAssistantMessageParam = {
role: "assistant",
content: accumulatedResponse,
};
if (assistantToolCallsForMessage.length) {
assistantMsg.tool_calls = assistantToolCallsForMessage;
}
messages.push(assistantMsg);
for (const result of toolCallResults) {
const toolMsg: ChatCompletionToolMessageParam = {
role: "tool",
tool_call_id: result.callId,
content: result.error || result.result,
};
messages.push(toolMsg);
}
}
} }
private async chatOnceNonStreaming( private buildTools(options: IAiChatOptions): ChatCompletionTool[] {
return (options.tools || []).map((tool): ChatCompletionFunctionTool => ({
type: tool.definition.type,
function: {
name: tool.definition.function.name,
description: tool.definition.function.description,
parameters: tool.definition.function.parameters,
},
}));
}
private async readStreamingChatCompletion(
model: IAiModelConfig, model: IAiModelConfig,
messages: ChatCompletionMessageParam[], messages: ChatCompletionMessageParam[],
tools: ChatCompletionTool[], tools: ChatCompletionTool[],
): Promise<{ streamCallback?: IAiResponseStreamFn,
response: string; ): Promise<OpenAiChatIterationResult> {
thinking?: string; const response = await this.client.chat.completions.create({
toolCalls: IToolCall[]; model: model.modelId,
assistantToolCalls: Array<{ messages,
id: string; tools,
type: "function"; stream: true,
function: { ...(typeof model.params.reasoning === "string"
name: string; ? {
arguments: string; reasoning_effort: model.params.reasoning as
}; | "low"
}>; | "medium"
}> { | "high",
}
: {}),
});
let content = "";
let thinking = "";
let chunkCount = 0;
let contentDeltaCount = 0;
let toolDeltaCount = 0;
let finishReason: string | null | undefined;
const toolCallMap = new Map<number, StreamingToolCallAccumulator>();
for await (const chunk of response) {
chunkCount++;
finishReason = chunk.choices[0]?.finish_reason ?? finishReason;
const delta = chunk.choices[0]?.delta;
if (!delta) continue;
if (delta.content) {
contentDeltaCount++;
content += delta.content;
if (streamCallback) {
await streamCallback({ type: "response", data: delta.content });
}
}
if ("reasoning" in delta && delta.reasoning) {
thinking += delta.reasoning as string;
if (streamCallback) {
await streamCallback({ type: "thinking", data: delta.reasoning as string });
}
}
if (delta.tool_calls) {
toolDeltaCount += delta.tool_calls.length;
for (const toolCallDelta of delta.tool_calls) {
this.accumulateToolCallDelta(toolCallMap, toolCallDelta);
}
}
}
return {
response: content,
thinking: thinking || undefined,
...this.buildToolCallsFromMap(toolCallMap),
finishReason,
chunkCount,
contentDeltaCount,
toolDeltaCount,
};
}
private async readNonStreamingChatCompletion(
model: IAiModelConfig,
messages: ChatCompletionMessageParam[],
tools: ChatCompletionTool[],
): Promise<OpenAiChatIterationResult> {
const response = await this.client.chat.completions.create({ const response = await this.client.chat.completions.create({
model: model.modelId, model: model.modelId,
messages, messages,
@ -566,6 +473,90 @@ export class OpenAiApi extends AiApi {
response: content, response: content,
toolCalls, toolCalls,
assistantToolCalls, assistantToolCalls,
finishReason: choice?.finish_reason,
chunkCount: 0,
contentDeltaCount: content ? 1 : 0,
toolDeltaCount: assistantToolCalls.length,
};
}
private accumulateToolCallDelta(
toolCallMap: Map<number, StreamingToolCallAccumulator>,
delta: {
index: number;
id?: string;
type?: string;
function?: {
name?: string;
arguments?: string;
};
},
): void {
let accumulated = toolCallMap.get(delta.index);
if (!accumulated) {
accumulated = {
index: delta.index,
id: delta.id || `tool_${Date.now()}_${delta.index}`,
type: "function",
function: {
name: "",
arguments: "",
},
};
toolCallMap.set(delta.index, accumulated);
}
if (delta.id) accumulated.id = delta.id;
if (delta.function?.name) accumulated.function.name += delta.function.name;
if (delta.function?.arguments) {
accumulated.function.arguments += delta.function.arguments;
}
}
private buildToolCallsFromMap(
toolCallMap: Map<number, StreamingToolCallAccumulator>,
): Pick<OpenAiChatIterationResult, "toolCalls" | "assistantToolCalls"> {
const assistantToolCalls = Array.from(toolCallMap.values())
.sort((a, b) => a.index - b.index)
.filter((toolCall) => toolCall.function.name)
.map((toolCall) => ({
id: toolCall.id,
type: "function" as const,
function: {
name: toolCall.function.name,
arguments: toolCall.function.arguments,
},
}));
const toolCalls: IToolCall[] = assistantToolCalls.map((toolCall) => ({
callId: toolCall.id,
function: {
name: toolCall.function.name,
arguments: toolCall.function.arguments,
},
}));
return { toolCalls, assistantToolCalls };
}
private isEmptyIteration(iteration: OpenAiChatIterationResult): boolean {
return (
!iteration.response.trim() &&
!iteration.thinking?.trim() &&
iteration.toolCalls.length === 0
);
}
private buildStats(startTime: number): IAiChatResponse["stats"] {
const seconds = (Date.now() - startTime) / 1000;
return {
duration: {
seconds,
text: numeral(seconds).format("hh:mm:ss"),
},
tokenCounts: {
input: 0,
response: 0,
thinking: 0,
},
}; };
} }
} }