From 61ba0e441230db2b9d0a75da2c5294b838b2ef36 Mon Sep 17 00:00:00 2001 From: Rob Colbert Date: Thu, 7 May 2026 21:36:01 -0400 Subject: [PATCH] streaming responses (see ./docs/streaming-responses.md) --- docs/streaming-responses.md | 558 +++++++++++++++--- gadget-code/frontend/package.json | 1 + .../frontend/src/components/ChatTurn.tsx | 126 ++-- gadget-code/frontend/src/lib/api.ts | 28 +- .../frontend/src/pages/ChatSessionView.tsx | 179 +++++- gadget-code/pnpm-lock.yaml | 3 + gadget-code/src/lib/code-session.ts | 6 +- gadget-code/src/lib/drone-session.ts | 115 +++- gadget-code/src/models/chat-turn.ts | 24 +- gadget-drone/src/services/agent.ts | 35 +- gadget-drone/src/services/ai.ts | 22 +- packages/ai/src/api.ts | 4 + packages/ai/src/ollama.ts | 47 ++ packages/ai/src/openai.ts | 110 +++- packages/api/src/interfaces/chat-turn.ts | 25 +- 15 files changed, 1040 insertions(+), 243 deletions(-) diff --git a/docs/streaming-responses.md b/docs/streaming-responses.md index 357a17d..b1bf9d9 100644 --- a/docs/streaming-responses.md +++ b/docs/streaming-responses.md @@ -1,117 +1,479 @@ # Gadget Code Streaming Responses -The [Architecture](./architecture.md) and [Socket Protocol](./socket-protocol.md) documents describe the Gadget Code system, and the socket protocol we use for transmitting events, respectively. +**Status:** ✅ **IMPLEMENTED** — Full end-to-end streaming responses operational +**Last Updated:** May 7, 2026 -The User selects a Project and Drone from a list of available Projects and Drones. The User then starts a chat session with the Drone to work on the Project. When the user submits a prompt: +## Quick Reference -1. gadget-code:frontend generates the `submitPrompt` event and sends it to [gadget-code:backend](../gadget-code/src/web-app.ts). -2. gadget-code:backend wraps the prompt as a Work Order, selects the [gadget-drone](../gadget-drone/src/gadget-drone.ts) process locked to the chat session (CodeSession), and forwards the Work Order to the drone for processing. -3. The drone executes the Agentic Workflow Loop, calling the specified AI SDK's `chat` endpoint to process a prompt until done. +**Streaming Path:** AI Provider → @gadget/ai → gadget-drone → gadget-code (backend) → Frontend IDE -As long as the response contains tool calls, the loop continues inserting the tool call response output into the context and calling the AI ASK again for additional processing. +**Key Concepts:** +- **IAiStreamChunk**: Unified chunk type with `type: 'thinking'|'response'|'toolCall'` +- **Backend Aggregation**: Tokens buffered in `DroneSession`, persisted at mode changes +- **Frontend In-Place Updates**: Blocks updated by index, not appended (prevents DOM flooding) +- **Blocks Array**: `ChatTurn.blocks[]` stores ordered thinking/responding/tool blocks -When there are no further tool calls received, the loop ends and communicates the final results of the turn back to gadget-code:backend, which then forwards the information back to the gadget-code:frontend (IDE) that sent the command. +**Critical Files:** +- `packages/ai/src/api.ts` — Stream chunk interface +- `gadget-code/src/lib/drone-session.ts` — Aggregation logic +- `gadget-code/frontend/src/pages/ChatSessionView.tsx` — Frontend state management +- `gadget-code/frontend/src/components/ChatTurn.tsx` — Block rendering -The system currently performs all work, receives the response, and sends back a response at the conclusion of the work. +## Overview -## Streaming Responses +Gadget Code implements real-time streaming responses from AI providers (OpenAI, Ollama) through the entire system stack. As the AI model generates tokens, they flow immediately from the provider → drone → backend → frontend, where they are displayed to the user with minimal latency. -The OpenAI and Ollama SDKs both support streaming responses. We want to convert/refactor our Agentic Workflow Loop's response - to _streaming responses_. +The system supports three types of streaming content: +- **Thinking tokens**: Reasoning/output from models with thinking capabilities +- **Response tokens**: The model's primary response content +- **Tool calls**: Function/tool invocations requested by the model -- As thinking/reasoning tokens are emitted, we want gadget-drone to emit the `thinking` event to the backend on it's DroneSession so the client IDE can update as the model is processing the work. -- As response tokens are emitted from the streaming response, we want to emit our own `response` event so the IDE can update as the model is processing the work. -- As tool calls are being requested by the agent, we want to emit our `toolCall` event so the IDE can update with information about the tool call in real-time as the model is processing the work. +## Architecture & Data Flow -### Streaming: OpenAI - -We will need to add streaming response processing to [test](../packages/ai/src/openai.ts). The `generate` and `chat` methods currently accept the `streamCallback` parameter, which is a callback that gets called with each token and tool call as it's emitted from the SDK. - -We need to verify that the full path for these events is implemented, from gadget-drone generating the event to gadget-code:frontend (IDE) receiving the event, with gadget-code:backend (web server) processing things correctly to get the message routed back to the IDE CodeSession that wants it based on the work order's information for chat session, etc. - -### Streaming: Ollama - -We will need to add streaming response processing to [test](../packages/ai/src/ollama.ts). The `generate` and `chat` methods currently accept the `streamCallback` parameter, which is a callback that gets called with each token and tool call as it's emitted from the SDK. - -We need to verify that the full path for these events is implemented, from gadget-drone generating the event to gadget-code:frontend (IDE) receiving the event, with gadget-code:backend (web server) processing things correctly to get the message routed back to the IDE CodeSession that wants it based on the work order's information for chat session, etc. - -## IDE Processing - -When the User submits a prompt for processing as a work order, the IDE creates a ChatTurn component instance in the Chat View, and uses the Turn component to display these streaming response updates as they arrive. - -The Turn component will display a spinner in it's header area while processing the work order/prompt. - -Within the Agent's portion of the Turn display, it should look like the agent is having it's own conversation. Updates should occur in modes or phases that change based on the _most recent_ content received. The agent's response, while processing a prompt, has three modes: - -- **Idle**: The agent's content display begins as Idle when the Turn is created. This allows it to 'sense' a transition to either `thinking` or `responding`. -- **Thinking**: When transitioning to the `thinking` mode, a new thinking div is added to receive the token(s). We continue streaming thinking tokens into the thinking div for as long as the tokens are still thinking tokens. When a response-mode token is received, we transition TO response mode, insert a new response div, and proceed. -- **Responding**: When transitioning to the `responding` mode, a new response div is added to receive the token(s). We continue streaming further response-mode tokens into this div for as long as the tokens are still response-mode tokens. If a thinking token is received, we transition TO the thinking mode, insert a new thinking div, and proceed. - -If the content received is in the same mode as the current agent response mode (thinking/responding), then this is an append operation. As content arrives, we append the content to the current thinking/responding content, and update the display of it. We do this as efficiently as possible, trying to avoid copies as much as possible, as these messages can arrive at a rather high rate of speed. - -### IDE Tool Call Displays - -When a `toolCall` message is received, break out of the current thinking/responding block to append a tool call div/display in the agent's response. We will use this to provide SUMMARY information about the tool call: - -- An ● indicator showing success status (green for success, red for error, yellow for warning, etc.) -- The name of the tool called (e.g., `search_google`) - -● search_google - -We don't summarize successful or erroneous responses here in the tool call indicator in the chat message flow. We will be adding a separate view to display these messages in full detail later in the SESSION panel of the Chat Session view. - -## Persistence - -The primary goal of this application is to make the agentic engineering process as **observable** as possible. If there is data available from a work order's Agentic Workflow Loop and processing, we: - -1. Normalize that data to our own interfaces as defined by: - 1. [@gadget/ai](../packages/ai/) and [@gadget/api](../packages/api/); - 1. [ChatTurn](../packages/api/src/interfaces/chat-turn.ts) interface; and - 1. [ChatTurn](../gadget-code/src/models/chat-turn.ts) model. -2. Store that normalized data into our own database (MongoDB) within the current `ChatTurn` record being managed by gadget-code:backend (web server). - -The `gadget-code:backend` (web) server will be responsible for persisting this data to our database. As events arrive FROM `gadget-drone` indicating progress within a Work Order (`thinking`, `response`, `toolCall`), we will update the `ChatTurn` record with the new data. And this is why stateful sessions exist for [DroneSession](../gadget-code/src/lib/drone-session.ts) and [CodeSession](../gadget-code/src/lib/code-session.ts). - -While receiving thinking tokens, we should be aggregating that content into the session object in memory instead of calling the database with every micro-update. At mode changes, such as when changing from thinking => responding, or responding => thinking, we perform one update to write that block's aggregated content to the `ChatTurn` as it's type of content (thinking, responding, tool). - -We need to adapt the persistence within `ChatTurn` to record this stateful/modal flow. When the User loads an existing `ChatSession` and it's associated turns for display, they should receive the _same display_ as they saw while working. - -### Changes to ChatTurn Model - -We need to add fields to our [ChatTurn](../packages/api/src/interfaces/chat-turn.ts) interface and [ChatTurn](../gadget-code/src/models/chat-turn.ts) model that will allow us to store the stateful information for display in the UI. - -Currently, there are simple string/text fields for `thinking` and `response`. This is insufficient for being able to reconstruct the actual flow that happened while processing the work order. - -We will repurpose the `response` field to become an array of objects, each object representing a `thinking`/`responding`/`toolCall` block. The `thinking` and `responding` blocks will be that which stores the aggregated content that was received while processing the work order, in order. The `toolCall` block will be that which stores the information about each tool call that happens, in order. - -Each object will have a `mode` (`thinking`, `responding`, `tool`), a `createdAt` timestamp, and a `content` field which is the content of the event for display in the UI. The `content` field can be a string, such as when storing thinking and responding text content, or an object that records tool call information and metadata for analysis later by the User and by analytics tools. - -## Chat Turn Component Updates - -The [Chat Turn Component](../gadget-code/frontend/src/components/ChatTurn.tsx) will require updates to enhance the Agent's portion of the turn display. The Agent's block will now resemble: +### Complete Streaming Path ``` -Thinking: [content streams in] - -● search_google -● search_google - -Thinking: [content streams in] - -Responding: [content streams in] - -● search_google - -Responding: [content streams in] +┌─────────────────┐ +│ AI Provider │ (OpenAI / Ollama SDK) +│ (Streaming) │ +└────────┬────────┘ + │ IAiStreamChunk + │ (type: 'thinking'|'response'|'toolCall') + ▼ +┌─────────────────┐ +│ @gadget/ai │ (packages/ai/src/openai.ts or ollama.ts) +│ streamCallback │ +└────────┬────────┘ + │ Calls streamCallback(chunk) for each token + ▼ +┌─────────────────┐ +│ gadget-drone │ (gadget-drone/src/services/agent.ts) +│ AgentService │ +└────────┬────────┘ + │ Socket.IO events + │ thinking(content) + │ response(content) + │ toolCall(callId, name, params, response) + ▼ +┌─────────────────┐ +│ gadget-code │ (gadget-code/src/lib/drone-session.ts) +│ DroneSession │ +└────────┬────────┘ + │ Aggregates tokens by mode + │ Persists to MongoDB at mode changes + │ Routes events to CodeSession + ▼ +┌─────────────────┐ +│ gadget-code │ (gadget-code/src/lib/code-session.ts) +│ CodeSession │ +└────────┬────────┘ + │ Socket.IO events to IDE + ▼ +┌─────────────────┐ +│ Frontend IDE │ (gadget-code/frontend/src/pages/ChatSessionView.tsx) +│ Chat Turn │ +└─────────────────┘ ``` -We don't label the thinking and responding blocks with a header. Instead, we use style to indicate the type of block presented: +### Event Flow Details -- Thinking is muted -- Responding is standard formatting -- Tool calls are just a one-line element that summarizes the call +1. **AI Provider → @gadget/ai** + - OpenAI: Uses `stream: true` in `chat.completions.create()`, iterates SSE chunks + - Ollama: Uses `stream: true` in `client.chat()`, iterates async chunks + - Both call `streamCallback(chunk: IAiStreamChunk)` for each token -## Markdown +2. **@gadget/ai → gadget-drone** + - `streamCallback` emits Socket.IO events based on chunk type: + ```typescript + switch (chunk.type) { + case 'thinking': socket.emit("thinking", chunk.data); break; + case 'response': socket.emit("response", chunk.data); break; + case 'toolCall': socket.emit("toolCall", callId, name, params, data); break; + } + ``` -All text content within a Chat Turn display is in Markdown format. The User uses Markdown, and the Agent responds with Markdown. We will use the [marked]() package to render the Markdown text to HTML for display. +3. **gadget-drone → gadget-code:backend** + - Events arrive at `DroneSession` via Socket.IO + - `DroneSession` aggregates tokens in memory (see Aggregation below) + - At mode changes or tool calls, flushes to MongoDB + - Routes events to corresponding `CodeSession` via `SocketService.getCodeSessionByChatSessionId()` -Caveat: The marked library sets it's `breaks` option to `false` by default. This is not the behavior what we want. When rendering Markdown content for display in the Chat Turn component for either the User or the Aget, `breaks` must be set to `true` to honor line breaks and display them appropriately. +4. **gadget-code:backend → Frontend IDE** + - `CodeSession` forwards events to IDE socket + - `ChatSessionView` receives events and updates React state + - `ChatTurn` component renders blocks with Markdown + +## IAiStreamChunk Interface + +Defined in `packages/ai/src/api.ts`: + +```typescript +interface IAiStreamChunk { + type: 'thinking' | 'response' | 'toolCall'; + data: string; + toolCallId?: string; + toolName?: string; + params?: string; +} + +type IAiResponseStreamFn = (chunk: IAiStreamChunk) => Promise; +``` + +The `type` field determines how the chunk is processed and displayed. The `data` field contains the token content. For tool calls, `toolCallId`, `toolName`, and `params` provide metadata. + +## Aggregation & Persistence + +### Where Aggregation Lives + +**Location:** `gadget-code/src/lib/drone-session.ts` +**Class:** `DroneSession` +**Private Field:** `streamingBuffers: Map` + +### IStreamingBuffer Interface + +```typescript +interface IStreamingBuffer { + currentMode: 'thinking' | 'responding' | null; + thinkingContent: string; + respondingContent: string; + lastBlockCreatedAt?: Date; +} +``` + +### How Aggregation Works + +1. **Token Arrival**: When `onThinking()` or `onResponse()` is called: + - Get or create buffer for the current `turnId` + - Check if `currentMode` matches the incoming token type + - If mode changed, flush previous buffer to database first + - Append token content to appropriate field (`thinkingContent` or `respondingContent`) + +2. **Mode Transition Detection**: + - `thinking` event while in `responding` mode → flush responding, start thinking + - `response` event while in `thinking` mode → flush thinking, start responding + - Tool call events → flush current buffer, add tool block immediately + +3. **Database Persistence**: + - Flushes occur at mode transitions (not on every token) + - `ChatTurn.blocks` array is updated with aggregated content + - Tool calls are persisted immediately as separate blocks + +4. **Work Order Completion**: + - `onWorkOrderComplete()` flushes any remaining buffered content + - Ensures no tokens are lost if streaming ends abruptly + +### Example Flow + +``` +Token Stream: [think: "Hmm"] [think: " let"] [think: " me"] [resp: "Sure"] [tool: search_google] [resp: " I'll"] + +Buffer State: +1. After "Hmm": { mode: 'thinking', thinking: "Hmm" } +2. After " let": { mode: 'thinking', thinking: "Hmm let" } +3. After " me": { mode: 'thinking', thinking: "Hmm let me" } +4. After "Sure": FLUSH thinking → DB, { mode: 'responding', responding: "Sure" } +5. After tool call: FLUSH responding → DB, ADD tool block → DB, { mode: null } +6. After " I'll": { mode: 'responding', responding: " I'll" } +``` + +## ChatTurn Data Model + +### IChatTurnBlock Interface + +Defined in `packages/api/src/interfaces/chat-turn.ts`: + +```typescript +interface IChatTurnBlockThinking { + mode: 'thinking'; + createdAt: Date; + content: string; +} + +interface IChatTurnBlockResponding { + mode: 'responding'; + createdAt: Date; + content: string; +} + +interface IChatTurnBlockTool { + mode: 'tool'; + createdAt: Date; + content: IChatToolCall; // { callId, name, parameters, response } +} + +type IChatTurnBlock = IChatTurnBlockThinking | IChatTurnBlockResponding | IChatTurnBlockTool; +``` + +### IChatTurn Changes + +The `IChatTurn` interface now uses a `blocks` array instead of flat `thinking` and `response` strings: + +```typescript +interface IChatTurn { + // ... other fields ... + blocks: IChatTurnBlock[]; // NEW: ordered blocks of thinking/responding/tool + toolCalls: IChatToolCall[]; // Still maintained for detailed tool call data + // ... other fields ... +} +``` + +**Removed:** `thinking?: string` and `response?: string` fields (replaced by blocks) + +### Mongoose Schema + +In `gadget-code/src/models/chat-turn.ts`: + +```typescript +const ChatTurnBlockSchema = new Schema({ + mode: { + type: String, + enum: ['thinking', 'responding', 'tool'], + required: true + }, + createdAt: { type: Date, default: Date.now, required: true }, + content: { type: Schema.Types.Mixed, required: true }, +}, { _id: false }); + +ChatTurnSchema = new Schema({ + // ... + blocks: { type: [ChatTurnBlockSchema], default: [], required: true }, + // ... +}); +``` + +## Frontend Event Handling & Rendering + +### Event Reception + +**Location:** `gadget-code/frontend/src/pages/ChatSessionView.tsx` + +The `ChatSessionView` component maintains streaming state: + +```typescript +interface StreamingState { + currentMode: 'thinking' | 'responding' | null; + thinkingContent: string; + respondingContent: string; + currentBlockIndex: number | null; // Tracks which block is being updated +} +``` + +### Event Handlers + +1. **handleThinking(content: string)**: + - If mode changed from responding → thinking, flush responding block + - Aggregate thinking content + - Update current block in place (or create new if mode transition) + +2. **handleResponse(content: string)**: + - If mode changed from thinking → responding, flush thinking block + - Aggregate response content + - Update current block in place (or create new if mode transition) + +3. **handleToolCall(callId, name, params, response)**: + - Flush any current streaming buffer + - Add tool block immediately (no aggregation for tools) + - Reset streaming state + +4. **handleWorkOrderComplete()**: + - Flush any remaining buffered content + - Clean up streaming state for this turn + +### In-Place Block Updates + +The key optimization: **blocks are updated in place during streaming, not appended**. + +```typescript +// In scheduleUpdate(): +if (currentBlockIndex !== null && oldTurn.blocks[currentBlockIndex]) { + // Same mode → update existing block + if (oldBlocks[currentBlockIndex].mode === updateBlock.mode) { + oldBlocks[currentBlockIndex] = updateBlock; // In-place update + newTurn.blocks = oldBlocks; + } else { + // Mode changed → append new block + newTurn.blocks = [...oldTurn.blocks, ...turnUpdates.blocks]; + state.currentBlockIndex = oldTurn.blocks.length; + } +} +``` + +This prevents the DOM from being flooded with duplicate blocks on each token. + +### ChatTurn Component Rendering + +**Location:** `gadget-code/frontend/src/components/ChatTurn.tsx` + +The component renders the `blocks` array: + +```typescript +{turn.blocks.map((block, idx) => { + if (block.mode === 'thinking') { + return ( +
+
Thinking
+
+
+ ); + } else if (block.mode === 'responding') { + return ( +
+
+
+ ); + } else if (block.mode === 'tool') { + const toolCall = block.content; + return ( +
+
+ + {toolCall.name} + {toolCall.response && } +
+
+ ); + } +})} +``` + +### Styling + +- **Thinking blocks**: Muted text (`text-text-muted`), monospace font, secondary background +- **Responding blocks**: Standard primary text, Markdown rendering +- **Tool blocks**: One-line summary with ● indicator (brand color), green checkmark if response exists + +### Markdown Rendering + +Uses the `marked` library with `breaks: true` to honor line breaks: + +```typescript +import { marked } from "marked"; + +marked.setOptions({ + breaks: true, // Critical: renders \n as
+}); +``` + +## Key Implementation Files + +| Component | File Path | Responsibility | +|-----------|-----------|----------------| +| **AI Interface** | `packages/ai/src/api.ts` | `IAiStreamChunk`, `IAiResponseStreamFn` types | +| **OpenAI Provider** | `packages/ai/src/openai.ts` | Streaming from OpenAI SDK, calls `streamCallback` | +| **Ollama Provider** | `packages/ai/src/ollama.ts` | Streaming from Ollama SDK, calls `streamCallback` | +| **Drone Agent** | `gadget-drone/src/services/agent.ts` | Routes stream chunks to Socket.IO events | +| **Backend Aggregation** | `gadget-code/src/lib/drone-session.ts` | Buffers tokens, persists at mode changes | +| **Backend Routing** | `gadget-code/src/lib/code-session.ts` | Forwards events to IDE socket | +| **Frontend State** | `gadget-code/frontend/src/pages/ChatSessionView.tsx` | Manages streaming state, in-place updates | +| **Frontend Rendering** | `gadget-code/frontend/src/components/ChatTurn.tsx` | Renders blocks with Markdown | +| **Data Model** | `packages/api/src/interfaces/chat-turn.ts` | `IChatTurnBlock` types | +| **Mongoose Schema** | `gadget-code/src/models/chat-turn.ts` | MongoDB persistence schema | + +## Design Decisions + +### Why Aggregate in Backend? + +1. **Database Efficiency**: Writing to MongoDB on every token would overwhelm the database +2. **Network Efficiency**: Fewer, larger updates instead of thousands of micro-updates +3. **Mode Awareness**: Backend can detect mode transitions and structure data appropriately + +### Why In-Place Updates in Frontend? + +1. **Performance**: Updating existing DOM nodes is cheaper than creating new ones +2. **Memory**: Prevents accumulation of hundreds of nearly-identical block objects +3. **Correctness**: Ensures the UI reflects the actual streaming state (one active block per mode) + +### Why Blocks Array Instead of Strings? + +1. **Temporal Ordering**: Preserves the exact sequence of thinking/responding/tool events +2. **Reconstruction**: Can replay the agent's "thought process" exactly as it happened +3. **Analytics**: Easy to query for patterns (e.g., "how many mode transitions per turn?") +4. **Query Efficiency**: No need for complex MongoDB aggregations to reconstruct turns + +## Testing & Verification + +### Manual Testing Steps + +1. Start backend: `cd gadget-code && pnpm dev:backend` +2. Start frontend: `cd gadget-code/frontend && pnpm dev` +3. Start drone: `cd ~/workspace && pnpm --filter gadget-drone dev` +4. Create chat session, submit prompt +5. Observe: + - Thinking content streams in (muted, monospace) + - Response content streams in (standard text) + - Tool calls appear as one-line summaries + - Mode transitions create new blocks + - Final display matches streaming sequence + +### What to Look For + +✅ **Correct behavior:** +- Content streams in real-time (not all at once at the end) +- Thinking blocks are visually distinct (muted, monospace) +- Tool calls break between thinking/responding blocks +- No duplicate blocks (each mode has one active block during streaming) +- Markdown renders correctly with line breaks + +❌ **Incorrect behavior:** +- Hundreds of blocks with duplicate content (aggregation broken) +- All content appears at once (streaming not working) +- Thinking and response mixed in same block (mode detection broken) +- Tool calls not appearing (tool call events not routed) + +## Future Enhancements + +Potential improvements not yet implemented: + +1. **Token Count Streaming**: Emit token counts with each chunk for real-time stats +2. **Thinking/Response Mode Labels**: Optional headers to explicitly label block types +3. **Block Collapse/Expand**: Persist collapsed state for thinking blocks +4. **Streaming Cursor**: Visual indicator (blinking cursor) at end of active streaming block +5. **Subagent Streaming**: Extend streaming to subagent processes + +## Troubleshooting + +### No Streaming Updates + +**Symptoms:** UI shows spinner but no content until work order completes + +**Causes:** +1. `streamCallback` not being called in AI provider +2. Socket.IO events not emitted from drone +3. Event handlers not registered in `DroneSession` or `CodeSession` + +**Debug:** Check `gadget-drone.log` for "stream chunk received" entries + +### Duplicate Blocks + +**Symptoms:** UI shows many blocks with progressively longer content + +**Causes:** +1. `currentBlockIndex` not being tracked in frontend +2. `scheduleUpdate` always appending instead of updating in place +3. Mode transitions not detected properly + +**Debug:** Inspect `turn.blocks` array in React DevTools + +### Mode Transitions Not Working + +**Symptoms:** Thinking and response content mixed in same block + +**Causes:** +1. Chunk `type` field not set correctly in AI provider +2. Mode detection logic in event handlers broken +3. Buffer flush not occurring on mode change + +**Debug:** Log `state.currentMode` in event handlers + +## Related Documentation + +- [Architecture](./architecture.md) — Overall system architecture +- [Socket Protocol](./socket-protocol.md) — Socket.IO event definitions +- [ChatTurn Interface](../packages/api/src/interfaces/chat-turn.ts) — TypeScript types +- [ChatTurn Model](../gadget-code/src/models/chat-turn.ts) — Mongoose schema diff --git a/gadget-code/frontend/package.json b/gadget-code/frontend/package.json index 9633e45..cb38a22 100644 --- a/gadget-code/frontend/package.json +++ b/gadget-code/frontend/package.json @@ -10,6 +10,7 @@ "author": "Robert Colbert ", "license": "Apache-2.0", "dependencies": { + "marked": "^16.0.0", "slug": "^11.0.1" } } \ No newline at end of file diff --git a/gadget-code/frontend/src/components/ChatTurn.tsx b/gadget-code/frontend/src/components/ChatTurn.tsx index 26433ae..f8fa732 100644 --- a/gadget-code/frontend/src/components/ChatTurn.tsx +++ b/gadget-code/frontend/src/components/ChatTurn.tsx @@ -1,14 +1,22 @@ -import { useState, memo, useCallback } from "react"; -import type { ChatTurn as ChatTurnType } from "../lib/api"; +import { useState, memo, useCallback, useEffect, useRef } from "react"; +import { marked } from "marked"; +import type { ChatTurn as ChatTurnType, ChatTurnBlock } from "../lib/api"; interface ChatTurnProps { turn: ChatTurnType; onUpdate: (turnId: string, updates: Partial) => void; } +// Configure marked with breaks enabled +marked.setOptions({ + breaks: true, +}); + const ChatTurn = memo(function ChatTurn({ turn, onUpdate }: ChatTurnProps) { const [thinkingExpanded, setThinkingExpanded] = useState(false); const [toolCallsExpanded, setToolCallsExpanded] = useState(true); + const [renderedBlocks, setRenderedBlocks] = useState>(new Map()); + const blocksEndRef = useRef(null); const handleThinkingToggle = useCallback(() => { setThinkingExpanded((prev) => !prev); @@ -101,86 +109,56 @@ const ChatTurn = memo(function ChatTurn({ turn, onUpdate }: ChatTurnProps) {
{/* Agent Response */} - {(turn.thinking || turn.response || (turn.toolCalls && turn.toolCalls.length > 0)) && ( + {(turn.blocks && turn.blocks.length > 0) && (
Gadget
- {/* Thinking Section */} - {turn.thinking && ( -
- - {thinkingExpanded && ( -
- {turn.thinking} + {/* Render blocks in order */} + {turn.blocks.map((block, idx) => { + if (block.mode === 'thinking') { + return ( +
+
+ Thinking +
+
- )} -
- )} - - {/* Response Section */} - {turn.response && ( -
- {turn.response} -
- )} - - {/* Tool Calls Section */} - {turn.toolCalls && turn.toolCalls.length > 0 && ( -
- - {toolCallsExpanded && ( -
- {turn.toolCalls.map((toolCall, idx) => ( -
-
- - {toolCall.name} - {toolCall.response && ( - - )} -
- {toolCall.parameters && ( -
- {toolCall.parameters} -
- )} - {toolCall.response && ( -
- {toolCall.response} -
- )} -
- ))} + ); + } else if (block.mode === 'responding') { + return ( +
+
- )} -
- )} + ); + } else if (block.mode === 'tool') { + const toolCall = block.content; + return ( +
+
+ + {toolCall.name} + {toolCall.response && ( + + )} +
+
+ ); + } + return null; + })}
{startedAt.toLocaleTimeString()} diff --git a/gadget-code/frontend/src/lib/api.ts b/gadget-code/frontend/src/lib/api.ts index 992da22..86a465a 100644 --- a/gadget-code/frontend/src/lib/api.ts +++ b/gadget-code/frontend/src/lib/api.ts @@ -298,6 +298,31 @@ export interface ChatTurnPrompts { system?: string; } +export interface ChatTurnBlockThinking { + mode: 'thinking'; + createdAt: string; + content: string; +} + +export interface ChatTurnBlockResponding { + mode: 'responding'; + createdAt: string; + content: string; +} + +export interface ChatTurnBlockTool { + mode: 'tool'; + createdAt: string; + content: { + callId: string; + name: string; + parameters?: string; + response?: string; + }; +} + +export type ChatTurnBlock = ChatTurnBlockThinking | ChatTurnBlockResponding | ChatTurnBlockTool; + export interface ChatTurn { _id: string; createdAt: string; @@ -309,8 +334,7 @@ export interface ChatTurn { mode: ChatSessionMode; status: "processing" | "finished" | "error"; prompts: ChatTurnPrompts; - thinking?: string; - response?: string; + blocks: ChatTurnBlock[]; errorMessage?: string; toolCalls: Array<{ callId: string; diff --git a/gadget-code/frontend/src/pages/ChatSessionView.tsx b/gadget-code/frontend/src/pages/ChatSessionView.tsx index ecf940a..ae5fe10 100644 --- a/gadget-code/frontend/src/pages/ChatSessionView.tsx +++ b/gadget-code/frontend/src/pages/ChatSessionView.tsx @@ -16,6 +16,13 @@ interface LogEntry { message: string; } +interface StreamingState { + currentMode: 'thinking' | 'responding' | null; + thinkingContent: string; + respondingContent: string; + currentBlockIndex: number | null; +} + export default function ChatSessionView() { const { projectId, sessionId } = useParams<{ projectId: string; sessionId: string }>(); const navigate = useNavigate(); @@ -46,6 +53,7 @@ export default function ChatSessionView() { const pendingUpdatesRef = useRef>>(new Map()); const updateRafRef = useRef(null); const currentTurnIdRef = useRef(null); + const streamingStateRef = useRef>(new Map()); useEffect(() => { loadSessionData(); @@ -145,11 +153,33 @@ export default function ChatSessionView() { const oldTurn = newTurns[turnIndex]; const newTurn = { ...oldTurn, ...turnUpdates }; - if (turnUpdates.thinking !== undefined) { - newTurn.thinking = (oldTurn.thinking || '') + turnUpdates.thinking; - } - if (turnUpdates.response !== undefined) { - newTurn.response = (oldTurn.response || '') + turnUpdates.response; + if (turnUpdates.blocks !== undefined) { + const state = streamingStateRef.current.get(turnId); + const currentBlockIndex = state?.currentBlockIndex ?? null; + + // If we have a current block index, update it in place + if (currentBlockIndex !== null && oldTurn.blocks && oldTurn.blocks[currentBlockIndex]) { + const oldBlocks = [...oldTurn.blocks]; + const updateBlock = turnUpdates.blocks[0]; + + // Only update if the mode matches + if (oldBlocks[currentBlockIndex].mode === updateBlock.mode) { + oldBlocks[currentBlockIndex] = updateBlock; + newTurn.blocks = oldBlocks; + } else { + // Mode changed, append new block and update index + newTurn.blocks = [...oldTurn.blocks, ...turnUpdates.blocks]; + if (state) { + state.currentBlockIndex = oldTurn.blocks.length; + } + } + } else { + // No current block, append and set index + newTurn.blocks = [...(oldTurn.blocks || []), ...turnUpdates.blocks]; + if (state && turnUpdates.blocks.length > 0) { + state.currentBlockIndex = oldTurn.blocks ? oldTurn.blocks.length : 0; + } + } } if (turnUpdates.toolCalls !== undefined) { newTurn.toolCalls = [...(oldTurn.toolCalls || []), ...turnUpdates.toolCalls]; @@ -178,7 +208,41 @@ export default function ChatSessionView() { const turnId = currentTurnIdRef.current; if (!turnId) return; - pendingUpdatesRef.current.set(turnId, { thinking: content }); + let state = streamingStateRef.current.get(turnId); + if (!state) { + state = { currentMode: null, thinkingContent: '', respondingContent: '', currentBlockIndex: null }; + streamingStateRef.current.set(turnId, state); + } + + // Check for mode transition + if (state.currentMode !== 'thinking') { + // Flush previous mode + if (state.currentMode === 'responding' && state.respondingContent) { + pendingUpdatesRef.current.set(turnId, { + blocks: [{ + mode: 'responding' as const, + createdAt: new Date().toISOString(), + content: state.respondingContent, + }], + }); + scheduleUpdate(); + state.respondingContent = ''; + state.currentBlockIndex = null; + } + state.currentMode = 'thinking'; + } + + // Aggregate content + state.thinkingContent += content; + + // Update with aggregated content + pendingUpdatesRef.current.set(turnId, { + blocks: [{ + mode: 'thinking' as const, + createdAt: new Date().toISOString(), + content: state.thinkingContent, + }], + }); scheduleUpdate(); }, [scheduleUpdate]); @@ -186,7 +250,41 @@ export default function ChatSessionView() { const turnId = currentTurnIdRef.current; if (!turnId) return; - pendingUpdatesRef.current.set(turnId, { response: content }); + let state = streamingStateRef.current.get(turnId); + if (!state) { + state = { currentMode: null, thinkingContent: '', respondingContent: '', currentBlockIndex: null }; + streamingStateRef.current.set(turnId, state); + } + + // Check for mode transition + if (state.currentMode !== 'responding') { + // Flush previous mode + if (state.currentMode === 'thinking' && state.thinkingContent) { + pendingUpdatesRef.current.set(turnId, { + blocks: [{ + mode: 'thinking' as const, + createdAt: new Date().toISOString(), + content: state.thinkingContent, + }], + }); + scheduleUpdate(); + state.thinkingContent = ''; + state.currentBlockIndex = null; + } + state.currentMode = 'responding'; + } + + // Aggregate content + state.respondingContent += content; + + // Update with aggregated content + pendingUpdatesRef.current.set(turnId, { + blocks: [{ + mode: 'responding' as const, + createdAt: new Date().toISOString(), + content: state.respondingContent, + }], + }); scheduleUpdate(); }, [scheduleUpdate]); @@ -194,13 +292,78 @@ export default function ChatSessionView() { const turnId = currentTurnIdRef.current; if (!turnId) return; + // Flush current streaming state + const state = streamingStateRef.current.get(turnId); + if (state) { + if (state.currentMode === 'thinking' && state.thinkingContent) { + pendingUpdatesRef.current.set(turnId, { + blocks: [{ + mode: 'thinking' as const, + createdAt: new Date().toISOString(), + content: state.thinkingContent, + }], + }); + state.thinkingContent = ''; + state.currentBlockIndex = null; + } + if (state.currentMode === 'responding' && state.respondingContent) { + pendingUpdatesRef.current.set(turnId, { + blocks: [{ + mode: 'responding' as const, + createdAt: new Date().toISOString(), + content: state.respondingContent, + }], + }); + state.respondingContent = ''; + state.currentBlockIndex = null; + } + state.currentMode = null; + state.currentBlockIndex = null; + } + + // Add tool block pendingUpdatesRef.current.set(turnId, { - toolCalls: [{ callId, name, parameters: params, response }] + blocks: [{ + mode: 'tool' as const, + createdAt: new Date().toISOString(), + content: { callId, name, parameters: params, response }, + }], + toolCalls: [{ callId, name, parameters: params, response }], }); scheduleUpdate(); }, [scheduleUpdate]); const handleWorkOrderComplete = useCallback((turnId: string, success: boolean, message?: string) => { + // Flush any remaining streaming state + const state = streamingStateRef.current.get(turnId); + if (state) { + const blocks: ChatTurnBlock[] = []; + if (state.currentMode === 'thinking' && state.thinkingContent) { + blocks.push({ + mode: 'thinking' as const, + createdAt: new Date().toISOString(), + content: state.thinkingContent, + }); + } + if (state.currentMode === 'responding' && state.respondingContent) { + blocks.push({ + mode: 'responding' as const, + createdAt: new Date().toISOString(), + content: state.respondingContent, + }); + } + if (blocks.length > 0) { + setTurns(prevTurns => + prevTurns.map(turn => + turn._id === turnId + ? { ...turn, blocks: [...(turn.blocks || []), ...blocks] } + : turn + ) + ); + } + streamingStateRef.current.delete(turnId); + } + setTurns(prevTurns => prevTurns.map(turn => turn._id === turnId diff --git a/gadget-code/pnpm-lock.yaml b/gadget-code/pnpm-lock.yaml index f8055a0..786f231 100644 --- a/gadget-code/pnpm-lock.yaml +++ b/gadget-code/pnpm-lock.yaml @@ -231,6 +231,9 @@ importers: frontend: dependencies: + marked: + specifier: ^16.0.0 + version: 16.0.0 slug: specifier: ^11.0.1 version: 11.0.1 diff --git a/gadget-code/src/lib/code-session.ts b/gadget-code/src/lib/code-session.ts index c258d66..6627552 100644 --- a/gadget-code/src/lib/code-session.ts +++ b/gadget-code/src/lib/code-session.ts @@ -172,7 +172,7 @@ export class CodeSession extends SocketSession { "processWorkOrder", this.selectedDrone, turn, - (success: boolean, message?: string) => { + async (success: boolean, message?: string) => { if (success) { this.log.info("work order accepted by drone", { turnId: turn._id, @@ -184,8 +184,8 @@ export class CodeSession extends SocketSession { message, }); turn.status = ChatTurnStatus.Error; - turn.response = message || "Drone rejected work order"; - turn.save(); + turn.errorMessage = message || "Drone rejected work order"; + await turn.save(); } }, ); diff --git a/gadget-code/src/lib/drone-session.ts b/gadget-code/src/lib/drone-session.ts index 6a4141b..77b8c63 100644 --- a/gadget-code/src/lib/drone-session.ts +++ b/gadget-code/src/lib/drone-session.ts @@ -17,12 +17,20 @@ import { import { SocketService } from "../services/index.js"; import { ChatTurn } from "../models/chat-turn.js"; +interface IStreamingBuffer { + currentMode: 'thinking' | 'responding' | null; + thinkingContent: string; + respondingContent: string; + lastBlockCreatedAt?: Date; +} + export class DroneSession extends SocketSession { protected type: SocketSessionType = SocketSessionType.Drone; registration: IDroneRegistration; chatSessionId: GadgetId | undefined; currentTurnId: GadgetId | undefined; workspaceMode: WorkspaceMode = WorkspaceMode.Idle; + private streamingBuffers: Map = new Map(); constructor(socket: GadgetSocket, registration: IDroneRegistration) { super(socket, registration.user as IUser); @@ -71,6 +79,7 @@ export class DroneSession extends SocketSession { /** * Called when the drone emits thinking content from the agent. + * Aggregates thinking tokens in memory and persists at mode changes. */ async onThinking(content: string): Promise { if (!this.chatSessionId) { @@ -85,9 +94,19 @@ export class DroneSession extends SocketSession { codeSession.onThinking(content); if (this.currentTurnId) { - await ChatTurn.findByIdAndUpdate(this.currentTurnId, { - thinking: content, - }); + const buffer = this.getOrCreateBuffer(this.currentTurnId); + + // Check for mode transition + if (buffer.currentMode !== 'thinking') { + // Flush previous mode if exists + await this.flushBuffer(this.currentTurnId); + buffer.currentMode = 'thinking'; + buffer.thinkingContent = ''; + buffer.lastBlockCreatedAt = new Date(); + } + + // Aggregate content + buffer.thinkingContent += content; } } catch (error) { this.log.error("failed to route thinking event", { error }); @@ -96,6 +115,7 @@ export class DroneSession extends SocketSession { /** * Called when the drone emits response content from the agent. + * Aggregates response tokens in memory and persists at mode changes. */ async onResponse(content: string): Promise { if (!this.chatSessionId) { @@ -110,9 +130,19 @@ export class DroneSession extends SocketSession { codeSession.onResponse(content); if (this.currentTurnId) { - await ChatTurn.findByIdAndUpdate(this.currentTurnId, { - response: content, - }); + const buffer = this.getOrCreateBuffer(this.currentTurnId); + + // Check for mode transition + if (buffer.currentMode !== 'responding') { + // Flush previous mode if exists + await this.flushBuffer(this.currentTurnId); + buffer.currentMode = 'responding'; + buffer.respondingContent = ''; + buffer.lastBlockCreatedAt = new Date(); + } + + // Aggregate content + buffer.respondingContent += content; } } catch (error) { this.log.error("failed to route response event", { error }); @@ -121,6 +151,7 @@ export class DroneSession extends SocketSession { /** * Called when the drone emits a tool call event from the agent. + * Flushes current buffer and adds tool block immediately. */ async onToolCall( callId: string, @@ -140,8 +171,22 @@ export class DroneSession extends SocketSession { codeSession.onToolCall(callId, name, params, response); if (this.currentTurnId) { + // Flush current buffer before adding tool block + await this.flushBuffer(this.currentTurnId); + + // Add tool block immediately const turn = await ChatTurn.findById(this.currentTurnId); if (turn) { + turn.blocks.push({ + mode: 'tool', + createdAt: new Date(), + content: { + callId, + name, + parameters: params, + response, + }, + }); turn.toolCalls.push({ callId, name, @@ -159,6 +204,7 @@ export class DroneSession extends SocketSession { /** * Called when the drone completes a work order. + * Flushes any remaining buffered content before finalizing. */ async onWorkOrderComplete( turnId: string, @@ -173,6 +219,10 @@ export class DroneSession extends SocketSession { } try { + // Flush any remaining buffered content + await this.flushBuffer(turnId); + this.streamingBuffers.delete(turnId); + const turn = await ChatTurn.findById(turnId); if (turn) { turn.status = success ? ChatTurnStatus.Finished : ChatTurnStatus.Error; @@ -198,6 +248,57 @@ export class DroneSession extends SocketSession { */ setChatSessionId(chatSessionId: GadgetId): void { this.chatSessionId = chatSessionId; + // Clear buffer for this turn if exists + this.streamingBuffers.clear(); + } + + /** + * Gets or creates a streaming buffer for a turn. + */ + private getOrCreateBuffer(turnId: string): IStreamingBuffer { + if (!this.streamingBuffers.has(turnId)) { + this.streamingBuffers.set(turnId, { + currentMode: null, + thinkingContent: '', + respondingContent: '', + }); + } + return this.streamingBuffers.get(turnId)!; + } + + /** + * Flushes the current buffer to the database. + */ + private async flushBuffer(turnId: string): Promise { + const buffer = this.streamingBuffers.get(turnId); + if (!buffer) return; + + const turn = await ChatTurn.findById(turnId); + if (!turn) return; + + // Flush thinking content + if (buffer.currentMode === 'thinking' && buffer.thinkingContent) { + turn.blocks.push({ + mode: 'thinking', + createdAt: buffer.lastBlockCreatedAt || new Date(), + content: buffer.thinkingContent, + }); + buffer.thinkingContent = ''; + } + + // Flush responding content + if (buffer.currentMode === 'responding' && buffer.respondingContent) { + turn.blocks.push({ + mode: 'responding', + createdAt: buffer.lastBlockCreatedAt || new Date(), + content: buffer.respondingContent, + }); + buffer.respondingContent = ''; + } + + if (turn.blocks.length > 0) { + await turn.save(); + } } /** @@ -265,7 +366,7 @@ export class DroneSession extends SocketSession { // Turn is still processing - mark for retry turn.status = ChatTurnStatus.Error; - turn.response = "Drone crashed during processing - retrying"; + turn.errorMessage = "Drone crashed during processing - retrying"; await turn.save(); this.socket.emit("crashRecoveryResponse", { diff --git a/gadget-code/src/models/chat-turn.ts b/gadget-code/src/models/chat-turn.ts index b614107..bb6c91a 100644 --- a/gadget-code/src/models/chat-turn.ts +++ b/gadget-code/src/models/chat-turn.ts @@ -12,6 +12,7 @@ import { IChatToolCall, IChatTurn, IChatTurnStats, + IChatTurnBlock, } from "@gadget/api"; import { nanoid } from "nanoid"; @@ -29,6 +30,16 @@ export const ChatTurnStatsSchema = new Schema({ durationLabel: { type: String, default: "pending", required: true }, }); +const ChatTurnBlockSchema = new Schema({ + mode: { + type: String, + enum: ['thinking', 'responding', 'tool'], + required: true + }, + createdAt: { type: Date, default: Date.now, required: true }, + content: { type: Schema.Types.Mixed, required: true }, +}, { _id: false }); + export const ChatToolCallSchema = new Schema({ callId: { type: String, required: true }, name: { type: String, required: true }, @@ -65,8 +76,7 @@ export const ChatTurnSchema = new Schema({ required: true, }, prompts: { type: ChatTurnPromptsSchema, required: true }, - thinking: { type: String, required: false }, - response: { type: String, required: false }, + blocks: { type: [ChatTurnBlockSchema], default: [], required: true }, errorMessage: { type: String, required: false }, toolCalls: { type: [ChatToolCallSchema], default: [], required: true }, subagents: { type: [ChatSubagentProcessSchema], default: [], required: true }, @@ -94,15 +104,13 @@ ChatTurnSchema.index( ChatTurnSchema.index( { user: 1, - prompt: "text", - thinking: "text", - response: "text", + "prompts.user": "text", + "prompts.system": "text", }, { weights: { - prompt: 10, - response: 5, - thinking: 1, + "prompts.user": 10, + "prompts.system": 5, }, name: "chat-turn-user-text-index", }, diff --git a/gadget-drone/src/services/agent.ts b/gadget-drone/src/services/agent.ts index aef5149..5c67f2b 100644 --- a/gadget-drone/src/services/agent.ts +++ b/gadget-drone/src/services/agent.ts @@ -68,6 +68,24 @@ class AgentService extends GadgetService { const onStreamChunk = async (chunk: IAiStreamChunk): Promise => { this.log.debug("stream chunk received", { chunk }); + + switch (chunk.type) { + case 'thinking': + socket.emit("thinking", chunk.data); + break; + case 'response': + socket.emit("response", chunk.data); + break; + case 'toolCall': + socket.emit( + "toolCall", + chunk.toolCallId!, + chunk.toolName!, + chunk.params || "{}", + chunk.data, + ); + break; + } }; try { @@ -186,15 +204,18 @@ class AgentService extends GadgetService { * (reasoning) output (if any). */ let content = ""; - if (turn.thinking) { - content += `${turn.thinking}`; - if (turn.response && turn.response.length) { - content += "\n"; + + // Extract thinking and response from blocks + for (const block of turn.blocks) { + if (block.mode === 'thinking' && typeof block.content === 'string') { + content += `${block.content}`; + } else if (block.mode === 'responding' && typeof block.content === 'string') { + if (content && content.length) { + content += "\n"; + } + content += block.content; } } - if (turn.response) { - content += turn.response; - } messages.push({ createdAt: turn.createdAt, diff --git a/gadget-drone/src/services/ai.ts b/gadget-drone/src/services/ai.ts index e675ab1..d265ed5 100644 --- a/gadget-drone/src/services/ai.ts +++ b/gadget-drone/src/services/ai.ts @@ -3,6 +3,17 @@ // Licensed under the Apache License, Version 2.0 import env from "../config/env.ts"; +const aiEnv: IAiEnvironment = { + NODE_ENV: env.NODE_ENV, + services: { + google: { + cse: { + apiKey: env.google.cse.apiKey, + engineId: env.google.cse.engineId, + }, + }, + }, +}; import { IAiProvider as DbAiProvider, GadgetId } from "@gadget/api"; import { GadgetService } from "../lib/service.js"; @@ -128,17 +139,6 @@ class AiService extends GadgetService { } getApi(provider: AiProviderConfig) { - const aiEnv: IAiEnvironment = { - NODE_ENV: env.NODE_ENV, - services: { - google: { - cse: { - apiKey: env.google.cse.apiKey, - engineId: env.google.cse.engineId, - }, - }, - }, - }; return createAiApi(aiEnv, provider, this.log); } } diff --git a/packages/ai/src/api.ts b/packages/ai/src/api.ts index 874d2a8..3d68c92 100644 --- a/packages/ai/src/api.ts +++ b/packages/ai/src/api.ts @@ -96,7 +96,11 @@ export interface IAiChatResponse { } export interface IAiStreamChunk { + type: 'thinking' | 'response' | 'toolCall'; data: string; + toolCallId?: string; + toolName?: string; + params?: string; } export type IAiResponseStreamFn = (chunk: IAiStreamChunk) => Promise; diff --git a/packages/ai/src/ollama.ts b/packages/ai/src/ollama.ts index f88b9f2..3accc34 100644 --- a/packages/ai/src/ollama.ts +++ b/packages/ai/src/ollama.ts @@ -165,6 +165,21 @@ export class OllamaAiApi extends AiApi { for await (const chunk of response) { await this.log.debug("stream chunk received", { chunk }); lastChunk = chunk; + + if (streamCallback) { + if (chunk.thinking) { + await streamCallback({ + type: 'thinking', + data: chunk.thinking, + }); + } + if (chunk.response) { + await streamCallback({ + type: 'response', + data: chunk.response, + }); + } + } } assert(lastChunk, "no stream response chunks received"); @@ -231,9 +246,41 @@ export class OllamaAiApi extends AiApi { }); let lastChunk; + let accumulatedThinking = ""; + let accumulatedResponse = ""; + for await (const chunk of response) { await this.log.debug("stream chunk received", { chunk }); lastChunk = chunk; + + if (streamCallback) { + if (chunk.message.thinking) { + accumulatedThinking += chunk.message.thinking; + await streamCallback({ + type: 'thinking', + data: chunk.message.thinking, + }); + } + if (chunk.message.content) { + accumulatedResponse += chunk.message.content; + await streamCallback({ + type: 'response', + data: chunk.message.content, + }); + } + if (chunk.message.tool_calls) { + for (const tc of chunk.message.tool_calls) { + const params = JSON.stringify(tc.function.arguments); + await streamCallback({ + type: 'toolCall', + data: params, + toolCallId: `tool_${tc.function.name}_${Date.now()}`, + toolName: tc.function.name, + params, + }); + } + } + } } assert(lastChunk, "no response chunks received"); diff --git a/packages/ai/src/openai.ts b/packages/ai/src/openai.ts index 095df7b..1b6a968 100644 --- a/packages/ai/src/openai.ts +++ b/packages/ai/src/openai.ts @@ -201,25 +201,51 @@ export class OpenAiApi extends AiApi { : []), { role: "user" as const, content: options.prompt }, ], - stream: false, + stream: true, }); - const choice = response.choices[0]; + let accumulatedResponse = ""; + let accumulatedThinking = ""; + + for await (const chunk of response) { + const delta = chunk.choices[0]?.delta; + if (delta) { + if (delta.content) { + accumulatedResponse += delta.content; + if (streamCallback) { + await streamCallback({ + type: 'response', + data: delta.content, + }); + } + } + if ('reasoning' in delta && delta.reasoning) { + accumulatedThinking += delta.reasoning as string; + if (streamCallback) { + await streamCallback({ + type: 'thinking', + data: delta.reasoning as string, + }); + } + } + } + } + const endTime = Date.now(); const durationMs = endTime - startTime; return { done: true, - response: choice.message.content || "", - thinking: undefined, + response: accumulatedResponse, + thinking: accumulatedThinking || undefined, stats: { duration: { seconds: durationMs / 1000, text: numeral(durationMs / 1000).format("hh:mm:ss"), }, tokenCounts: { - input: response.usage?.prompt_tokens || 0, - response: response.usage?.completion_tokens || 0, + input: 0, + response: 0, thinking: 0, }, }, @@ -279,16 +305,56 @@ export class OpenAiApi extends AiApi { model: model.modelId, messages, tools, - stream: false, + stream: true, }); - const choice = response.choices[0]; - const endTime = Date.now(); - const durationMs = endTime - startTime; + let accumulatedResponse = ""; + let accumulatedThinking = ""; + let finalToolCalls: any = undefined; - const toolCalls = choice.message.tool_calls - ?.filter((tc) => tc.type === "function") - .map((tc) => ({ + for await (const chunk of response) { + const delta = chunk.choices[0]?.delta; + if (delta) { + if (delta.content) { + accumulatedResponse += delta.content; + if (streamCallback) { + await streamCallback({ + type: 'response', + data: delta.content, + }); + } + } + if ('reasoning' in delta && delta.reasoning) { + accumulatedThinking += delta.reasoning as string; + if (streamCallback) { + await streamCallback({ + type: 'thinking', + data: delta.reasoning as string, + }); + } + } + if (delta.tool_calls) { + finalToolCalls = delta.tool_calls; + for (const tc of delta.tool_calls) { + if (tc.function) { + if (streamCallback) { + await streamCallback({ + type: 'toolCall', + data: tc.function.arguments || "", + toolCallId: tc.id, + toolName: tc.function.name, + params: tc.function.arguments, + }); + } + } + } + } + } + } + + const toolCalls = finalToolCalls + ?.filter((tc: any) => tc.type === "function") + .map((tc: any) => ({ callId: tc.id, function: { name: tc.function.name, @@ -299,18 +365,18 @@ export class OpenAiApi extends AiApi { if (!toolCalls || toolCalls.length === 0) { return { done: true, - response: choice.message.content || "", - thinking: undefined, + response: accumulatedResponse, + thinking: accumulatedThinking || undefined, toolCalls: undefined, toolCallResults: allToolCallResults.length > 0 ? allToolCallResults : undefined, stats: { duration: { - seconds: durationMs / 1000, - text: numeral(durationMs / 1000).format("hh:mm:ss"), + seconds: (Date.now() - startTime) / 1000, + text: numeral((Date.now() - startTime) / 1000).format("hh:mm:ss"), }, tokenCounts: { - input: response.usage?.prompt_tokens || 0, - response: response.usage?.completion_tokens || 0, + input: 0, + response: 0, thinking: 0, }, }, @@ -325,10 +391,10 @@ export class OpenAiApi extends AiApi { const assistantMsg: ChatCompletionAssistantMessageParam = { role: "assistant", - content: choice.message.content, + content: accumulatedResponse, }; - if (choice.message.tool_calls) { - assistantMsg.tool_calls = choice.message.tool_calls; + if (finalToolCalls) { + assistantMsg.tool_calls = finalToolCalls; } messages.push(assistantMsg); diff --git a/packages/api/src/interfaces/chat-turn.ts b/packages/api/src/interfaces/chat-turn.ts index b7fb8a9..7360487 100644 --- a/packages/api/src/interfaces/chat-turn.ts +++ b/packages/api/src/interfaces/chat-turn.ts @@ -23,6 +23,26 @@ export interface IChatTurnPrompts { system?: string; } +export interface IChatTurnBlockThinking { + mode: 'thinking'; + createdAt: Date; + content: string; +} + +export interface IChatTurnBlockResponding { + mode: 'responding'; + createdAt: Date; + content: string; +} + +export interface IChatTurnBlockTool { + mode: 'tool'; + createdAt: Date; + content: IChatToolCall; +} + +export type IChatTurnBlock = IChatTurnBlockThinking | IChatTurnBlockResponding | IChatTurnBlockTool; + export interface IChatTurnStats { toolCallCount: number; // total number of tool functions called this turn inputTokens: number; // total number of input tokens processed this turn @@ -60,11 +80,10 @@ export interface IChatTurn { session: IChatSession | GadgetId; provider: IAiProvider | GadgetId; llm: string; // id/name of the model used to process the prompt - mode: ChatSessionMode; // session mode for this turn/prompt + mode: ChatSessionMode; status: ChatTurnStatus; prompts: IChatTurnPrompts; - thinking?: string; - response?: string; + blocks: IChatTurnBlock[]; errorMessage?: string; toolCalls: IChatToolCall[]; subagents: IChatSubagentProcess[]; // subagents used while processing this turn