streaming responses (see ./docs/streaming-responses.md)

This commit is contained in:
Rob Colbert 2026-05-07 21:36:01 -04:00
parent b481183c99
commit 61ba0e4412
15 changed files with 1040 additions and 243 deletions

View File

@ -1,117 +1,479 @@
# Gadget Code Streaming Responses
The [Architecture](./architecture.md) and [Socket Protocol](./socket-protocol.md) documents describe the Gadget Code system, and the socket protocol we use for transmitting events, respectively.
**Status:** ✅ **IMPLEMENTED** — Full end-to-end streaming responses operational
**Last Updated:** May 7, 2026
The User selects a Project and Drone from a list of available Projects and Drones. The User then starts a chat session with the Drone to work on the Project. When the user submits a prompt:
## Quick Reference
1. gadget-code:frontend generates the `submitPrompt` event and sends it to [gadget-code:backend](../gadget-code/src/web-app.ts).
2. gadget-code:backend wraps the prompt as a Work Order, selects the [gadget-drone](../gadget-drone/src/gadget-drone.ts) process locked to the chat session (CodeSession), and forwards the Work Order to the drone for processing.
3. The drone executes the Agentic Workflow Loop, calling the specified AI SDK's `chat` endpoint to process a prompt until done.
**Streaming Path:** AI Provider → @gadget/ai → gadget-drone → gadget-code (backend) → Frontend IDE
As long as the response contains tool calls, the loop continues inserting the tool call response output into the context and calling the AI ASK again for additional processing.
**Key Concepts:**
- **IAiStreamChunk**: Unified chunk type with `type: 'thinking'|'response'|'toolCall'`
- **Backend Aggregation**: Tokens buffered in `DroneSession`, persisted at mode changes
- **Frontend In-Place Updates**: Blocks updated by index, not appended (prevents DOM flooding)
- **Blocks Array**: `ChatTurn.blocks[]` stores ordered thinking/responding/tool blocks
When there are no further tool calls received, the loop ends and communicates the final results of the turn back to gadget-code:backend, which then forwards the information back to the gadget-code:frontend (IDE) that sent the command.
**Critical Files:**
- `packages/ai/src/api.ts` — Stream chunk interface
- `gadget-code/src/lib/drone-session.ts` — Aggregation logic
- `gadget-code/frontend/src/pages/ChatSessionView.tsx` — Frontend state management
- `gadget-code/frontend/src/components/ChatTurn.tsx` — Block rendering
The system currently performs all work, receives the response, and sends back a response at the conclusion of the work.
## Overview
## Streaming Responses
Gadget Code implements real-time streaming responses from AI providers (OpenAI, Ollama) through the entire system stack. As the AI model generates tokens, they flow immediately from the provider → drone → backend → frontend, where they are displayed to the user with minimal latency.
The OpenAI and Ollama SDKs both support streaming responses. We want to convert/refactor our Agentic Workflow Loop's response - to _streaming responses_.
The system supports three types of streaming content:
- **Thinking tokens**: Reasoning/output from models with thinking capabilities
- **Response tokens**: The model's primary response content
- **Tool calls**: Function/tool invocations requested by the model
- As thinking/reasoning tokens are emitted, we want gadget-drone to emit the `thinking` event to the backend on it's DroneSession so the client IDE can update as the model is processing the work.
- As response tokens are emitted from the streaming response, we want to emit our own `response` event so the IDE can update as the model is processing the work.
- As tool calls are being requested by the agent, we want to emit our `toolCall` event so the IDE can update with information about the tool call in real-time as the model is processing the work.
## Architecture & Data Flow
### Streaming: OpenAI
We will need to add streaming response processing to [test](../packages/ai/src/openai.ts). The `generate` and `chat` methods currently accept the `streamCallback` parameter, which is a callback that gets called with each token and tool call as it's emitted from the SDK.
We need to verify that the full path for these events is implemented, from gadget-drone generating the event to gadget-code:frontend (IDE) receiving the event, with gadget-code:backend (web server) processing things correctly to get the message routed back to the IDE CodeSession that wants it based on the work order's information for chat session, etc.
### Streaming: Ollama
We will need to add streaming response processing to [test](../packages/ai/src/ollama.ts). The `generate` and `chat` methods currently accept the `streamCallback` parameter, which is a callback that gets called with each token and tool call as it's emitted from the SDK.
We need to verify that the full path for these events is implemented, from gadget-drone generating the event to gadget-code:frontend (IDE) receiving the event, with gadget-code:backend (web server) processing things correctly to get the message routed back to the IDE CodeSession that wants it based on the work order's information for chat session, etc.
## IDE Processing
When the User submits a prompt for processing as a work order, the IDE creates a ChatTurn component instance in the Chat View, and uses the Turn component to display these streaming response updates as they arrive.
The Turn component will display a spinner in it's header area while processing the work order/prompt.
Within the Agent's portion of the Turn display, it should look like the agent is having it's own conversation. Updates should occur in modes or phases that change based on the _most recent_ content received. The agent's response, while processing a prompt, has three modes:
- **Idle**: The agent's content display begins as Idle when the Turn is created. This allows it to 'sense' a transition to either `thinking` or `responding`.
- **Thinking**: When transitioning to the `thinking` mode, a new thinking div is added to receive the token(s). We continue streaming thinking tokens into the thinking div for as long as the tokens are still thinking tokens. When a response-mode token is received, we transition TO response mode, insert a new response div, and proceed.
- **Responding**: When transitioning to the `responding` mode, a new response div is added to receive the token(s). We continue streaming further response-mode tokens into this div for as long as the tokens are still response-mode tokens. If a thinking token is received, we transition TO the thinking mode, insert a new thinking div, and proceed.
If the content received is in the same mode as the current agent response mode (thinking/responding), then this is an append operation. As content arrives, we append the content to the current thinking/responding content, and update the display of it. We do this as efficiently as possible, trying to avoid copies as much as possible, as these messages can arrive at a rather high rate of speed.
### IDE Tool Call Displays
When a `toolCall` message is received, break out of the current thinking/responding block to append a tool call div/display in the agent's response. We will use this to provide SUMMARY information about the tool call:
- An ● indicator showing success status (green for success, red for error, yellow for warning, etc.)
- The name of the tool called (e.g., `search_google`)
● search_google
We don't summarize successful or erroneous responses here in the tool call indicator in the chat message flow. We will be adding a separate view to display these messages in full detail later in the SESSION panel of the Chat Session view.
## Persistence
The primary goal of this application is to make the agentic engineering process as **observable** as possible. If there is data available from a work order's Agentic Workflow Loop and processing, we:
1. Normalize that data to our own interfaces as defined by:
1. [@gadget/ai](../packages/ai/) and [@gadget/api](../packages/api/);
1. [ChatTurn](../packages/api/src/interfaces/chat-turn.ts) interface; and
1. [ChatTurn](../gadget-code/src/models/chat-turn.ts) model.
2. Store that normalized data into our own database (MongoDB) within the current `ChatTurn` record being managed by gadget-code:backend (web server).
The `gadget-code:backend` (web) server will be responsible for persisting this data to our database. As events arrive FROM `gadget-drone` indicating progress within a Work Order (`thinking`, `response`, `toolCall`), we will update the `ChatTurn` record with the new data. And this is why stateful sessions exist for [DroneSession](../gadget-code/src/lib/drone-session.ts) and [CodeSession](../gadget-code/src/lib/code-session.ts).
While receiving thinking tokens, we should be aggregating that content into the session object in memory instead of calling the database with every micro-update. At mode changes, such as when changing from thinking => responding, or responding => thinking, we perform one update to write that block's aggregated content to the `ChatTurn` as it's type of content (thinking, responding, tool).
We need to adapt the persistence within `ChatTurn` to record this stateful/modal flow. When the User loads an existing `ChatSession` and it's associated turns for display, they should receive the _same display_ as they saw while working.
### Changes to ChatTurn Model
We need to add fields to our [ChatTurn](../packages/api/src/interfaces/chat-turn.ts) interface and [ChatTurn](../gadget-code/src/models/chat-turn.ts) model that will allow us to store the stateful information for display in the UI.
Currently, there are simple string/text fields for `thinking` and `response`. This is insufficient for being able to reconstruct the actual flow that happened while processing the work order.
We will repurpose the `response` field to become an array of objects, each object representing a `thinking`/`responding`/`toolCall` block. The `thinking` and `responding` blocks will be that which stores the aggregated content that was received while processing the work order, in order. The `toolCall` block will be that which stores the information about each tool call that happens, in order.
Each object will have a `mode` (`thinking`, `responding`, `tool`), a `createdAt` timestamp, and a `content` field which is the content of the event for display in the UI. The `content` field can be a string, such as when storing thinking and responding text content, or an object that records tool call information and metadata for analysis later by the User and by analytics tools.
## Chat Turn Component Updates
The [Chat Turn Component](../gadget-code/frontend/src/components/ChatTurn.tsx) will require updates to enhance the Agent's portion of the turn display. The Agent's block will now resemble:
### Complete Streaming Path
```
Thinking: [content streams in]
● search_google
● search_google
Thinking: [content streams in]
Responding: [content streams in]
● search_google
Responding: [content streams in]
┌─────────────────┐
│ AI Provider │ (OpenAI / Ollama SDK)
│ (Streaming) │
└────────┬────────┘
│ IAiStreamChunk
│ (type: 'thinking'|'response'|'toolCall')
┌─────────────────┐
@gadget/ai │ (packages/ai/src/openai.ts or ollama.ts)
│ streamCallback │
└────────┬────────┘
│ Calls streamCallback(chunk) for each token
┌─────────────────┐
│ gadget-drone │ (gadget-drone/src/services/agent.ts)
│ AgentService │
└────────┬────────┘
│ Socket.IO events
│ thinking(content)
│ response(content)
│ toolCall(callId, name, params, response)
┌─────────────────┐
│ gadget-code │ (gadget-code/src/lib/drone-session.ts)
│ DroneSession │
└────────┬────────┘
│ Aggregates tokens by mode
│ Persists to MongoDB at mode changes
│ Routes events to CodeSession
┌─────────────────┐
│ gadget-code │ (gadget-code/src/lib/code-session.ts)
│ CodeSession │
└────────┬────────┘
│ Socket.IO events to IDE
┌─────────────────┐
│ Frontend IDE │ (gadget-code/frontend/src/pages/ChatSessionView.tsx)
│ Chat Turn │
└─────────────────┘
```
We don't label the thinking and responding blocks with a header. Instead, we use style to indicate the type of block presented:
### Event Flow Details
- Thinking is muted
- Responding is standard formatting
- Tool calls are just a one-line element that summarizes the call
1. **AI Provider → @gadget/ai**
- OpenAI: Uses `stream: true` in `chat.completions.create()`, iterates SSE chunks
- Ollama: Uses `stream: true` in `client.chat()`, iterates async chunks
- Both call `streamCallback(chunk: IAiStreamChunk)` for each token
## Markdown
2. **@gadget/ai → gadget-drone**
- `streamCallback` emits Socket.IO events based on chunk type:
```typescript
switch (chunk.type) {
case 'thinking': socket.emit("thinking", chunk.data); break;
case 'response': socket.emit("response", chunk.data); break;
case 'toolCall': socket.emit("toolCall", callId, name, params, data); break;
}
```
All text content within a Chat Turn display is in Markdown format. The User uses Markdown, and the Agent responds with Markdown. We will use the [marked]() package to render the Markdown text to HTML for display.
3. **gadget-drone → gadget-code:backend**
- Events arrive at `DroneSession` via Socket.IO
- `DroneSession` aggregates tokens in memory (see Aggregation below)
- At mode changes or tool calls, flushes to MongoDB
- Routes events to corresponding `CodeSession` via `SocketService.getCodeSessionByChatSessionId()`
Caveat: The marked library sets it's `breaks` option to `false` by default. This is not the behavior what we want. When rendering Markdown content for display in the Chat Turn component for either the User or the Aget, `breaks` must be set to `true` to honor line breaks and display them appropriately.
4. **gadget-code:backend → Frontend IDE**
- `CodeSession` forwards events to IDE socket
- `ChatSessionView` receives events and updates React state
- `ChatTurn` component renders blocks with Markdown
## IAiStreamChunk Interface
Defined in `packages/ai/src/api.ts`:
```typescript
interface IAiStreamChunk {
type: 'thinking' | 'response' | 'toolCall';
data: string;
toolCallId?: string;
toolName?: string;
params?: string;
}
type IAiResponseStreamFn = (chunk: IAiStreamChunk) => Promise<void>;
```
The `type` field determines how the chunk is processed and displayed. The `data` field contains the token content. For tool calls, `toolCallId`, `toolName`, and `params` provide metadata.
## Aggregation & Persistence
### Where Aggregation Lives
**Location:** `gadget-code/src/lib/drone-session.ts`
**Class:** `DroneSession`
**Private Field:** `streamingBuffers: Map<string, IStreamingBuffer>`
### IStreamingBuffer Interface
```typescript
interface IStreamingBuffer {
currentMode: 'thinking' | 'responding' | null;
thinkingContent: string;
respondingContent: string;
lastBlockCreatedAt?: Date;
}
```
### How Aggregation Works
1. **Token Arrival**: When `onThinking()` or `onResponse()` is called:
- Get or create buffer for the current `turnId`
- Check if `currentMode` matches the incoming token type
- If mode changed, flush previous buffer to database first
- Append token content to appropriate field (`thinkingContent` or `respondingContent`)
2. **Mode Transition Detection**:
- `thinking` event while in `responding` mode → flush responding, start thinking
- `response` event while in `thinking` mode → flush thinking, start responding
- Tool call events → flush current buffer, add tool block immediately
3. **Database Persistence**:
- Flushes occur at mode transitions (not on every token)
- `ChatTurn.blocks` array is updated with aggregated content
- Tool calls are persisted immediately as separate blocks
4. **Work Order Completion**:
- `onWorkOrderComplete()` flushes any remaining buffered content
- Ensures no tokens are lost if streaming ends abruptly
### Example Flow
```
Token Stream: [think: "Hmm"] [think: " let"] [think: " me"] [resp: "Sure"] [tool: search_google] [resp: " I'll"]
Buffer State:
1. After "Hmm": { mode: 'thinking', thinking: "Hmm" }
2. After " let": { mode: 'thinking', thinking: "Hmm let" }
3. After " me": { mode: 'thinking', thinking: "Hmm let me" }
4. After "Sure": FLUSH thinking → DB, { mode: 'responding', responding: "Sure" }
5. After tool call: FLUSH responding → DB, ADD tool block → DB, { mode: null }
6. After " I'll": { mode: 'responding', responding: " I'll" }
```
## ChatTurn Data Model
### IChatTurnBlock Interface
Defined in `packages/api/src/interfaces/chat-turn.ts`:
```typescript
interface IChatTurnBlockThinking {
mode: 'thinking';
createdAt: Date;
content: string;
}
interface IChatTurnBlockResponding {
mode: 'responding';
createdAt: Date;
content: string;
}
interface IChatTurnBlockTool {
mode: 'tool';
createdAt: Date;
content: IChatToolCall; // { callId, name, parameters, response }
}
type IChatTurnBlock = IChatTurnBlockThinking | IChatTurnBlockResponding | IChatTurnBlockTool;
```
### IChatTurn Changes
The `IChatTurn` interface now uses a `blocks` array instead of flat `thinking` and `response` strings:
```typescript
interface IChatTurn {
// ... other fields ...
blocks: IChatTurnBlock[]; // NEW: ordered blocks of thinking/responding/tool
toolCalls: IChatToolCall[]; // Still maintained for detailed tool call data
// ... other fields ...
}
```
**Removed:** `thinking?: string` and `response?: string` fields (replaced by blocks)
### Mongoose Schema
In `gadget-code/src/models/chat-turn.ts`:
```typescript
const ChatTurnBlockSchema = new Schema<IChatTurnBlock>({
mode: {
type: String,
enum: ['thinking', 'responding', 'tool'],
required: true
},
createdAt: { type: Date, default: Date.now, required: true },
content: { type: Schema.Types.Mixed, required: true },
}, { _id: false });
ChatTurnSchema = new Schema({
// ...
blocks: { type: [ChatTurnBlockSchema], default: [], required: true },
// ...
});
```
## Frontend Event Handling & Rendering
### Event Reception
**Location:** `gadget-code/frontend/src/pages/ChatSessionView.tsx`
The `ChatSessionView` component maintains streaming state:
```typescript
interface StreamingState {
currentMode: 'thinking' | 'responding' | null;
thinkingContent: string;
respondingContent: string;
currentBlockIndex: number | null; // Tracks which block is being updated
}
```
### Event Handlers
1. **handleThinking(content: string)**:
- If mode changed from responding → thinking, flush responding block
- Aggregate thinking content
- Update current block in place (or create new if mode transition)
2. **handleResponse(content: string)**:
- If mode changed from thinking → responding, flush thinking block
- Aggregate response content
- Update current block in place (or create new if mode transition)
3. **handleToolCall(callId, name, params, response)**:
- Flush any current streaming buffer
- Add tool block immediately (no aggregation for tools)
- Reset streaming state
4. **handleWorkOrderComplete()**:
- Flush any remaining buffered content
- Clean up streaming state for this turn
### In-Place Block Updates
The key optimization: **blocks are updated in place during streaming, not appended**.
```typescript
// In scheduleUpdate():
if (currentBlockIndex !== null && oldTurn.blocks[currentBlockIndex]) {
// Same mode → update existing block
if (oldBlocks[currentBlockIndex].mode === updateBlock.mode) {
oldBlocks[currentBlockIndex] = updateBlock; // In-place update
newTurn.blocks = oldBlocks;
} else {
// Mode changed → append new block
newTurn.blocks = [...oldTurn.blocks, ...turnUpdates.blocks];
state.currentBlockIndex = oldTurn.blocks.length;
}
}
```
This prevents the DOM from being flooded with duplicate blocks on each token.
### ChatTurn Component Rendering
**Location:** `gadget-code/frontend/src/components/ChatTurn.tsx`
The component renders the `blocks` array:
```typescript
{turn.blocks.map((block, idx) => {
if (block.mode === 'thinking') {
return (
<div key={idx} className="mb-3">
<div className="text-xs text-text-muted mb-1 font-mono">Thinking</div>
<div
className="p-3 bg-bg-secondary rounded text-sm text-text-muted whitespace-pre-wrap font-mono text-xs"
dangerouslySetInnerHTML={{ __html: marked.parse(block.content) }}
/>
</div>
);
} else if (block.mode === 'responding') {
return (
<div key={idx} className="mb-3">
<div
className="text-text-primary"
dangerouslySetInnerHTML={{ __html: marked.parse(block.content) }}
/>
</div>
);
} else if (block.mode === 'tool') {
const toolCall = block.content;
return (
<div key={idx} className="mb-3">
<div className="flex items-center gap-2 text-xs font-mono text-text-secondary">
<span className="text-brand"></span>
<span>{toolCall.name}</span>
{toolCall.response && <span className="text-green-500"></span>}
</div>
</div>
);
}
})}
```
### Styling
- **Thinking blocks**: Muted text (`text-text-muted`), monospace font, secondary background
- **Responding blocks**: Standard primary text, Markdown rendering
- **Tool blocks**: One-line summary with ● indicator (brand color), green checkmark if response exists
### Markdown Rendering
Uses the `marked` library with `breaks: true` to honor line breaks:
```typescript
import { marked } from "marked";
marked.setOptions({
breaks: true, // Critical: renders \n as <br>
});
```
## Key Implementation Files
| Component | File Path | Responsibility |
|-----------|-----------|----------------|
| **AI Interface** | `packages/ai/src/api.ts` | `IAiStreamChunk`, `IAiResponseStreamFn` types |
| **OpenAI Provider** | `packages/ai/src/openai.ts` | Streaming from OpenAI SDK, calls `streamCallback` |
| **Ollama Provider** | `packages/ai/src/ollama.ts` | Streaming from Ollama SDK, calls `streamCallback` |
| **Drone Agent** | `gadget-drone/src/services/agent.ts` | Routes stream chunks to Socket.IO events |
| **Backend Aggregation** | `gadget-code/src/lib/drone-session.ts` | Buffers tokens, persists at mode changes |
| **Backend Routing** | `gadget-code/src/lib/code-session.ts` | Forwards events to IDE socket |
| **Frontend State** | `gadget-code/frontend/src/pages/ChatSessionView.tsx` | Manages streaming state, in-place updates |
| **Frontend Rendering** | `gadget-code/frontend/src/components/ChatTurn.tsx` | Renders blocks with Markdown |
| **Data Model** | `packages/api/src/interfaces/chat-turn.ts` | `IChatTurnBlock` types |
| **Mongoose Schema** | `gadget-code/src/models/chat-turn.ts` | MongoDB persistence schema |
## Design Decisions
### Why Aggregate in Backend?
1. **Database Efficiency**: Writing to MongoDB on every token would overwhelm the database
2. **Network Efficiency**: Fewer, larger updates instead of thousands of micro-updates
3. **Mode Awareness**: Backend can detect mode transitions and structure data appropriately
### Why In-Place Updates in Frontend?
1. **Performance**: Updating existing DOM nodes is cheaper than creating new ones
2. **Memory**: Prevents accumulation of hundreds of nearly-identical block objects
3. **Correctness**: Ensures the UI reflects the actual streaming state (one active block per mode)
### Why Blocks Array Instead of Strings?
1. **Temporal Ordering**: Preserves the exact sequence of thinking/responding/tool events
2. **Reconstruction**: Can replay the agent's "thought process" exactly as it happened
3. **Analytics**: Easy to query for patterns (e.g., "how many mode transitions per turn?")
4. **Query Efficiency**: No need for complex MongoDB aggregations to reconstruct turns
## Testing & Verification
### Manual Testing Steps
1. Start backend: `cd gadget-code && pnpm dev:backend`
2. Start frontend: `cd gadget-code/frontend && pnpm dev`
3. Start drone: `cd ~/workspace && pnpm --filter gadget-drone dev`
4. Create chat session, submit prompt
5. Observe:
- Thinking content streams in (muted, monospace)
- Response content streams in (standard text)
- Tool calls appear as one-line summaries
- Mode transitions create new blocks
- Final display matches streaming sequence
### What to Look For
✅ **Correct behavior:**
- Content streams in real-time (not all at once at the end)
- Thinking blocks are visually distinct (muted, monospace)
- Tool calls break between thinking/responding blocks
- No duplicate blocks (each mode has one active block during streaming)
- Markdown renders correctly with line breaks
❌ **Incorrect behavior:**
- Hundreds of blocks with duplicate content (aggregation broken)
- All content appears at once (streaming not working)
- Thinking and response mixed in same block (mode detection broken)
- Tool calls not appearing (tool call events not routed)
## Future Enhancements
Potential improvements not yet implemented:
1. **Token Count Streaming**: Emit token counts with each chunk for real-time stats
2. **Thinking/Response Mode Labels**: Optional headers to explicitly label block types
3. **Block Collapse/Expand**: Persist collapsed state for thinking blocks
4. **Streaming Cursor**: Visual indicator (blinking cursor) at end of active streaming block
5. **Subagent Streaming**: Extend streaming to subagent processes
## Troubleshooting
### No Streaming Updates
**Symptoms:** UI shows spinner but no content until work order completes
**Causes:**
1. `streamCallback` not being called in AI provider
2. Socket.IO events not emitted from drone
3. Event handlers not registered in `DroneSession` or `CodeSession`
**Debug:** Check `gadget-drone.log` for "stream chunk received" entries
### Duplicate Blocks
**Symptoms:** UI shows many blocks with progressively longer content
**Causes:**
1. `currentBlockIndex` not being tracked in frontend
2. `scheduleUpdate` always appending instead of updating in place
3. Mode transitions not detected properly
**Debug:** Inspect `turn.blocks` array in React DevTools
### Mode Transitions Not Working
**Symptoms:** Thinking and response content mixed in same block
**Causes:**
1. Chunk `type` field not set correctly in AI provider
2. Mode detection logic in event handlers broken
3. Buffer flush not occurring on mode change
**Debug:** Log `state.currentMode` in event handlers
## Related Documentation
- [Architecture](./architecture.md) — Overall system architecture
- [Socket Protocol](./socket-protocol.md) — Socket.IO event definitions
- [ChatTurn Interface](../packages/api/src/interfaces/chat-turn.ts) — TypeScript types
- [ChatTurn Model](../gadget-code/src/models/chat-turn.ts) — Mongoose schema

View File

@ -10,6 +10,7 @@
"author": "Robert Colbert <rob.colbert@openplatform.us>",
"license": "Apache-2.0",
"dependencies": {
"marked": "^16.0.0",
"slug": "^11.0.1"
}
}

View File

@ -1,14 +1,22 @@
import { useState, memo, useCallback } from "react";
import type { ChatTurn as ChatTurnType } from "../lib/api";
import { useState, memo, useCallback, useEffect, useRef } from "react";
import { marked } from "marked";
import type { ChatTurn as ChatTurnType, ChatTurnBlock } from "../lib/api";
interface ChatTurnProps {
turn: ChatTurnType;
onUpdate: (turnId: string, updates: Partial<ChatTurnType>) => void;
}
// Configure marked with breaks enabled
marked.setOptions({
breaks: true,
});
const ChatTurn = memo(function ChatTurn({ turn, onUpdate }: ChatTurnProps) {
const [thinkingExpanded, setThinkingExpanded] = useState(false);
const [toolCallsExpanded, setToolCallsExpanded] = useState(true);
const [renderedBlocks, setRenderedBlocks] = useState<Map<number, string>>(new Map());
const blocksEndRef = useRef<HTMLDivElement>(null);
const handleThinkingToggle = useCallback(() => {
setThinkingExpanded((prev) => !prev);
@ -101,86 +109,56 @@ const ChatTurn = memo(function ChatTurn({ turn, onUpdate }: ChatTurnProps) {
</div>
{/* Agent Response */}
{(turn.thinking || turn.response || (turn.toolCalls && turn.toolCalls.length > 0)) && (
{(turn.blocks && turn.blocks.length > 0) && (
<div className="max-w-[80%] ml-auto mb-4">
<div className="bg-bg-tertiary border border-border-default rounded p-4">
<div className="text-sm font-semibold mb-3 text-text-secondary">
Gadget
</div>
{/* Thinking Section */}
{turn.thinking && (
<div className="mb-3">
<button
onClick={handleThinkingToggle}
className="text-xs text-text-muted hover:text-text-secondary flex items-center gap-1 transition-colors"
>
<span
className={`transform transition-transform ${thinkingExpanded ? "rotate-90" : ""}`}
>
</span>
Thinking ({formatTokenCount(turn.thinking.length)} chars)
</button>
{thinkingExpanded && (
<div className="mt-2 p-3 bg-bg-secondary rounded text-sm text-text-secondary whitespace-pre-wrap font-mono text-xs">
{turn.thinking}
{/* Render blocks in order */}
{turn.blocks.map((block, idx) => {
if (block.mode === 'thinking') {
return (
<div key={idx} className="mb-3">
<div className="text-xs text-text-muted mb-1 font-mono">
Thinking
</div>
)}
</div>
)}
{/* Response Section */}
{turn.response && (
<div className="mb-3 whitespace-pre-wrap text-text-primary">
{turn.response}
</div>
)}
{/* Tool Calls Section */}
{turn.toolCalls && turn.toolCalls.length > 0 && (
<div className="mb-3">
<button
onClick={handleToolCallsToggle}
className="text-xs text-text-muted hover:text-text-secondary flex items-center gap-1 transition-colors"
>
<span
className={`transform transition-transform ${toolCallsExpanded ? "rotate-90" : ""}`}
>
</span>
Tool Calls ({turn.toolCalls.length})
</button>
{toolCallsExpanded && (
<div className="mt-2 space-y-2">
{turn.toolCalls.map((toolCall, idx) => (
<div
key={toolCall.callId || idx}
className="p-2 bg-bg-secondary rounded border border-border-default"
>
className="p-3 bg-bg-secondary rounded text-sm text-text-muted whitespace-pre-wrap font-mono text-xs"
dangerouslySetInnerHTML={{
__html: marked.parse(block.content) as string
}}
/>
</div>
);
} else if (block.mode === 'responding') {
return (
<div key={idx} className="mb-3">
<div
className="text-text-primary"
dangerouslySetInnerHTML={{
__html: marked.parse(block.content) as string
}}
/>
</div>
);
} else if (block.mode === 'tool') {
const toolCall = block.content;
return (
<div key={idx} className="mb-3">
<div className="flex items-center gap-2 text-xs font-mono text-text-secondary">
<span className="text-brand"></span>
<span className="text-brand"></span>
<span>{toolCall.name}</span>
{toolCall.response && (
<span className="text-green-500"></span>
)}
</div>
{toolCall.parameters && (
<div className="mt-1 text-xs font-mono text-text-muted whitespace-pre-wrap break-all">
{toolCall.parameters}
</div>
)}
{toolCall.response && (
<div className="mt-1 text-xs font-mono text-green-500 whitespace-pre-wrap break-all">
{toolCall.response}
</div>
)}
</div>
))}
</div>
)}
</div>
)}
);
}
return null;
})}
<div className="mt-2 text-xs text-text-muted">
{startedAt.toLocaleTimeString()}

View File

@ -298,6 +298,31 @@ export interface ChatTurnPrompts {
system?: string;
}
export interface ChatTurnBlockThinking {
mode: 'thinking';
createdAt: string;
content: string;
}
export interface ChatTurnBlockResponding {
mode: 'responding';
createdAt: string;
content: string;
}
export interface ChatTurnBlockTool {
mode: 'tool';
createdAt: string;
content: {
callId: string;
name: string;
parameters?: string;
response?: string;
};
}
export type ChatTurnBlock = ChatTurnBlockThinking | ChatTurnBlockResponding | ChatTurnBlockTool;
export interface ChatTurn {
_id: string;
createdAt: string;
@ -309,8 +334,7 @@ export interface ChatTurn {
mode: ChatSessionMode;
status: "processing" | "finished" | "error";
prompts: ChatTurnPrompts;
thinking?: string;
response?: string;
blocks: ChatTurnBlock[];
errorMessage?: string;
toolCalls: Array<{
callId: string;

View File

@ -16,6 +16,13 @@ interface LogEntry {
message: string;
}
interface StreamingState {
currentMode: 'thinking' | 'responding' | null;
thinkingContent: string;
respondingContent: string;
currentBlockIndex: number | null;
}
export default function ChatSessionView() {
const { projectId, sessionId } = useParams<{ projectId: string; sessionId: string }>();
const navigate = useNavigate();
@ -46,6 +53,7 @@ export default function ChatSessionView() {
const pendingUpdatesRef = useRef<Map<string, Partial<ChatTurn>>>(new Map());
const updateRafRef = useRef<number | null>(null);
const currentTurnIdRef = useRef<string | null>(null);
const streamingStateRef = useRef<Map<string, StreamingState>>(new Map());
useEffect(() => {
loadSessionData();
@ -145,11 +153,33 @@ export default function ChatSessionView() {
const oldTurn = newTurns[turnIndex];
const newTurn = { ...oldTurn, ...turnUpdates };
if (turnUpdates.thinking !== undefined) {
newTurn.thinking = (oldTurn.thinking || '') + turnUpdates.thinking;
if (turnUpdates.blocks !== undefined) {
const state = streamingStateRef.current.get(turnId);
const currentBlockIndex = state?.currentBlockIndex ?? null;
// If we have a current block index, update it in place
if (currentBlockIndex !== null && oldTurn.blocks && oldTurn.blocks[currentBlockIndex]) {
const oldBlocks = [...oldTurn.blocks];
const updateBlock = turnUpdates.blocks[0];
// Only update if the mode matches
if (oldBlocks[currentBlockIndex].mode === updateBlock.mode) {
oldBlocks[currentBlockIndex] = updateBlock;
newTurn.blocks = oldBlocks;
} else {
// Mode changed, append new block and update index
newTurn.blocks = [...oldTurn.blocks, ...turnUpdates.blocks];
if (state) {
state.currentBlockIndex = oldTurn.blocks.length;
}
}
} else {
// No current block, append and set index
newTurn.blocks = [...(oldTurn.blocks || []), ...turnUpdates.blocks];
if (state && turnUpdates.blocks.length > 0) {
state.currentBlockIndex = oldTurn.blocks ? oldTurn.blocks.length : 0;
}
}
if (turnUpdates.response !== undefined) {
newTurn.response = (oldTurn.response || '') + turnUpdates.response;
}
if (turnUpdates.toolCalls !== undefined) {
newTurn.toolCalls = [...(oldTurn.toolCalls || []), ...turnUpdates.toolCalls];
@ -178,7 +208,41 @@ export default function ChatSessionView() {
const turnId = currentTurnIdRef.current;
if (!turnId) return;
pendingUpdatesRef.current.set(turnId, { thinking: content });
let state = streamingStateRef.current.get(turnId);
if (!state) {
state = { currentMode: null, thinkingContent: '', respondingContent: '', currentBlockIndex: null };
streamingStateRef.current.set(turnId, state);
}
// Check for mode transition
if (state.currentMode !== 'thinking') {
// Flush previous mode
if (state.currentMode === 'responding' && state.respondingContent) {
pendingUpdatesRef.current.set(turnId, {
blocks: [{
mode: 'responding' as const,
createdAt: new Date().toISOString(),
content: state.respondingContent,
}],
});
scheduleUpdate();
state.respondingContent = '';
state.currentBlockIndex = null;
}
state.currentMode = 'thinking';
}
// Aggregate content
state.thinkingContent += content;
// Update with aggregated content
pendingUpdatesRef.current.set(turnId, {
blocks: [{
mode: 'thinking' as const,
createdAt: new Date().toISOString(),
content: state.thinkingContent,
}],
});
scheduleUpdate();
}, [scheduleUpdate]);
@ -186,7 +250,41 @@ export default function ChatSessionView() {
const turnId = currentTurnIdRef.current;
if (!turnId) return;
pendingUpdatesRef.current.set(turnId, { response: content });
let state = streamingStateRef.current.get(turnId);
if (!state) {
state = { currentMode: null, thinkingContent: '', respondingContent: '', currentBlockIndex: null };
streamingStateRef.current.set(turnId, state);
}
// Check for mode transition
if (state.currentMode !== 'responding') {
// Flush previous mode
if (state.currentMode === 'thinking' && state.thinkingContent) {
pendingUpdatesRef.current.set(turnId, {
blocks: [{
mode: 'thinking' as const,
createdAt: new Date().toISOString(),
content: state.thinkingContent,
}],
});
scheduleUpdate();
state.thinkingContent = '';
state.currentBlockIndex = null;
}
state.currentMode = 'responding';
}
// Aggregate content
state.respondingContent += content;
// Update with aggregated content
pendingUpdatesRef.current.set(turnId, {
blocks: [{
mode: 'responding' as const,
createdAt: new Date().toISOString(),
content: state.respondingContent,
}],
});
scheduleUpdate();
}, [scheduleUpdate]);
@ -194,13 +292,78 @@ export default function ChatSessionView() {
const turnId = currentTurnIdRef.current;
if (!turnId) return;
// Flush current streaming state
const state = streamingStateRef.current.get(turnId);
if (state) {
if (state.currentMode === 'thinking' && state.thinkingContent) {
pendingUpdatesRef.current.set(turnId, {
toolCalls: [{ callId, name, parameters: params, response }]
blocks: [{
mode: 'thinking' as const,
createdAt: new Date().toISOString(),
content: state.thinkingContent,
}],
});
state.thinkingContent = '';
state.currentBlockIndex = null;
}
if (state.currentMode === 'responding' && state.respondingContent) {
pendingUpdatesRef.current.set(turnId, {
blocks: [{
mode: 'responding' as const,
createdAt: new Date().toISOString(),
content: state.respondingContent,
}],
});
state.respondingContent = '';
state.currentBlockIndex = null;
}
state.currentMode = null;
state.currentBlockIndex = null;
}
// Add tool block
pendingUpdatesRef.current.set(turnId, {
blocks: [{
mode: 'tool' as const,
createdAt: new Date().toISOString(),
content: { callId, name, parameters: params, response },
}],
toolCalls: [{ callId, name, parameters: params, response }],
});
scheduleUpdate();
}, [scheduleUpdate]);
const handleWorkOrderComplete = useCallback((turnId: string, success: boolean, message?: string) => {
// Flush any remaining streaming state
const state = streamingStateRef.current.get(turnId);
if (state) {
const blocks: ChatTurnBlock[] = [];
if (state.currentMode === 'thinking' && state.thinkingContent) {
blocks.push({
mode: 'thinking' as const,
createdAt: new Date().toISOString(),
content: state.thinkingContent,
});
}
if (state.currentMode === 'responding' && state.respondingContent) {
blocks.push({
mode: 'responding' as const,
createdAt: new Date().toISOString(),
content: state.respondingContent,
});
}
if (blocks.length > 0) {
setTurns(prevTurns =>
prevTurns.map(turn =>
turn._id === turnId
? { ...turn, blocks: [...(turn.blocks || []), ...blocks] }
: turn
)
);
}
streamingStateRef.current.delete(turnId);
}
setTurns(prevTurns =>
prevTurns.map(turn =>
turn._id === turnId

View File

@ -231,6 +231,9 @@ importers:
frontend:
dependencies:
marked:
specifier: ^16.0.0
version: 16.0.0
slug:
specifier: ^11.0.1
version: 11.0.1

View File

@ -172,7 +172,7 @@ export class CodeSession extends SocketSession {
"processWorkOrder",
this.selectedDrone,
turn,
(success: boolean, message?: string) => {
async (success: boolean, message?: string) => {
if (success) {
this.log.info("work order accepted by drone", {
turnId: turn._id,
@ -184,8 +184,8 @@ export class CodeSession extends SocketSession {
message,
});
turn.status = ChatTurnStatus.Error;
turn.response = message || "Drone rejected work order";
turn.save();
turn.errorMessage = message || "Drone rejected work order";
await turn.save();
}
},
);

View File

@ -17,12 +17,20 @@ import {
import { SocketService } from "../services/index.js";
import { ChatTurn } from "../models/chat-turn.js";
interface IStreamingBuffer {
currentMode: 'thinking' | 'responding' | null;
thinkingContent: string;
respondingContent: string;
lastBlockCreatedAt?: Date;
}
export class DroneSession extends SocketSession {
protected type: SocketSessionType = SocketSessionType.Drone;
registration: IDroneRegistration;
chatSessionId: GadgetId | undefined;
currentTurnId: GadgetId | undefined;
workspaceMode: WorkspaceMode = WorkspaceMode.Idle;
private streamingBuffers: Map<string, IStreamingBuffer> = new Map();
constructor(socket: GadgetSocket, registration: IDroneRegistration) {
super(socket, registration.user as IUser);
@ -71,6 +79,7 @@ export class DroneSession extends SocketSession {
/**
* Called when the drone emits thinking content from the agent.
* Aggregates thinking tokens in memory and persists at mode changes.
*/
async onThinking(content: string): Promise<void> {
if (!this.chatSessionId) {
@ -85,9 +94,19 @@ export class DroneSession extends SocketSession {
codeSession.onThinking(content);
if (this.currentTurnId) {
await ChatTurn.findByIdAndUpdate(this.currentTurnId, {
thinking: content,
});
const buffer = this.getOrCreateBuffer(this.currentTurnId);
// Check for mode transition
if (buffer.currentMode !== 'thinking') {
// Flush previous mode if exists
await this.flushBuffer(this.currentTurnId);
buffer.currentMode = 'thinking';
buffer.thinkingContent = '';
buffer.lastBlockCreatedAt = new Date();
}
// Aggregate content
buffer.thinkingContent += content;
}
} catch (error) {
this.log.error("failed to route thinking event", { error });
@ -96,6 +115,7 @@ export class DroneSession extends SocketSession {
/**
* Called when the drone emits response content from the agent.
* Aggregates response tokens in memory and persists at mode changes.
*/
async onResponse(content: string): Promise<void> {
if (!this.chatSessionId) {
@ -110,9 +130,19 @@ export class DroneSession extends SocketSession {
codeSession.onResponse(content);
if (this.currentTurnId) {
await ChatTurn.findByIdAndUpdate(this.currentTurnId, {
response: content,
});
const buffer = this.getOrCreateBuffer(this.currentTurnId);
// Check for mode transition
if (buffer.currentMode !== 'responding') {
// Flush previous mode if exists
await this.flushBuffer(this.currentTurnId);
buffer.currentMode = 'responding';
buffer.respondingContent = '';
buffer.lastBlockCreatedAt = new Date();
}
// Aggregate content
buffer.respondingContent += content;
}
} catch (error) {
this.log.error("failed to route response event", { error });
@ -121,6 +151,7 @@ export class DroneSession extends SocketSession {
/**
* Called when the drone emits a tool call event from the agent.
* Flushes current buffer and adds tool block immediately.
*/
async onToolCall(
callId: string,
@ -140,8 +171,22 @@ export class DroneSession extends SocketSession {
codeSession.onToolCall(callId, name, params, response);
if (this.currentTurnId) {
// Flush current buffer before adding tool block
await this.flushBuffer(this.currentTurnId);
// Add tool block immediately
const turn = await ChatTurn.findById(this.currentTurnId);
if (turn) {
turn.blocks.push({
mode: 'tool',
createdAt: new Date(),
content: {
callId,
name,
parameters: params,
response,
},
});
turn.toolCalls.push({
callId,
name,
@ -159,6 +204,7 @@ export class DroneSession extends SocketSession {
/**
* Called when the drone completes a work order.
* Flushes any remaining buffered content before finalizing.
*/
async onWorkOrderComplete(
turnId: string,
@ -173,6 +219,10 @@ export class DroneSession extends SocketSession {
}
try {
// Flush any remaining buffered content
await this.flushBuffer(turnId);
this.streamingBuffers.delete(turnId);
const turn = await ChatTurn.findById(turnId);
if (turn) {
turn.status = success ? ChatTurnStatus.Finished : ChatTurnStatus.Error;
@ -198,6 +248,57 @@ export class DroneSession extends SocketSession {
*/
setChatSessionId(chatSessionId: GadgetId): void {
this.chatSessionId = chatSessionId;
// Clear buffer for this turn if exists
this.streamingBuffers.clear();
}
/**
* Gets or creates a streaming buffer for a turn.
*/
private getOrCreateBuffer(turnId: string): IStreamingBuffer {
if (!this.streamingBuffers.has(turnId)) {
this.streamingBuffers.set(turnId, {
currentMode: null,
thinkingContent: '',
respondingContent: '',
});
}
return this.streamingBuffers.get(turnId)!;
}
/**
* Flushes the current buffer to the database.
*/
private async flushBuffer(turnId: string): Promise<void> {
const buffer = this.streamingBuffers.get(turnId);
if (!buffer) return;
const turn = await ChatTurn.findById(turnId);
if (!turn) return;
// Flush thinking content
if (buffer.currentMode === 'thinking' && buffer.thinkingContent) {
turn.blocks.push({
mode: 'thinking',
createdAt: buffer.lastBlockCreatedAt || new Date(),
content: buffer.thinkingContent,
});
buffer.thinkingContent = '';
}
// Flush responding content
if (buffer.currentMode === 'responding' && buffer.respondingContent) {
turn.blocks.push({
mode: 'responding',
createdAt: buffer.lastBlockCreatedAt || new Date(),
content: buffer.respondingContent,
});
buffer.respondingContent = '';
}
if (turn.blocks.length > 0) {
await turn.save();
}
}
/**
@ -265,7 +366,7 @@ export class DroneSession extends SocketSession {
// Turn is still processing - mark for retry
turn.status = ChatTurnStatus.Error;
turn.response = "Drone crashed during processing - retrying";
turn.errorMessage = "Drone crashed during processing - retrying";
await turn.save();
this.socket.emit("crashRecoveryResponse", {

View File

@ -12,6 +12,7 @@ import {
IChatToolCall,
IChatTurn,
IChatTurnStats,
IChatTurnBlock,
} from "@gadget/api";
import { nanoid } from "nanoid";
@ -29,6 +30,16 @@ export const ChatTurnStatsSchema = new Schema<IChatTurnStats>({
durationLabel: { type: String, default: "pending", required: true },
});
const ChatTurnBlockSchema = new Schema<IChatTurnBlock>({
mode: {
type: String,
enum: ['thinking', 'responding', 'tool'],
required: true
},
createdAt: { type: Date, default: Date.now, required: true },
content: { type: Schema.Types.Mixed, required: true },
}, { _id: false });
export const ChatToolCallSchema = new Schema<IChatToolCall>({
callId: { type: String, required: true },
name: { type: String, required: true },
@ -65,8 +76,7 @@ export const ChatTurnSchema = new Schema<IChatTurn>({
required: true,
},
prompts: { type: ChatTurnPromptsSchema, required: true },
thinking: { type: String, required: false },
response: { type: String, required: false },
blocks: { type: [ChatTurnBlockSchema], default: [], required: true },
errorMessage: { type: String, required: false },
toolCalls: { type: [ChatToolCallSchema], default: [], required: true },
subagents: { type: [ChatSubagentProcessSchema], default: [], required: true },
@ -94,15 +104,13 @@ ChatTurnSchema.index(
ChatTurnSchema.index(
{
user: 1,
prompt: "text",
thinking: "text",
response: "text",
"prompts.user": "text",
"prompts.system": "text",
},
{
weights: {
prompt: 10,
response: 5,
thinking: 1,
"prompts.user": 10,
"prompts.system": 5,
},
name: "chat-turn-user-text-index",
},

View File

@ -68,6 +68,24 @@ class AgentService extends GadgetService {
const onStreamChunk = async (chunk: IAiStreamChunk): Promise<void> => {
this.log.debug("stream chunk received", { chunk });
switch (chunk.type) {
case 'thinking':
socket.emit("thinking", chunk.data);
break;
case 'response':
socket.emit("response", chunk.data);
break;
case 'toolCall':
socket.emit(
"toolCall",
chunk.toolCallId!,
chunk.toolName!,
chunk.params || "{}",
chunk.data,
);
break;
}
};
try {
@ -186,14 +204,17 @@ class AgentService extends GadgetService {
* (reasoning) output (if any).
*/
let content = "";
if (turn.thinking) {
content += `<thinking>${turn.thinking}</thinking>`;
if (turn.response && turn.response.length) {
// Extract thinking and response from blocks
for (const block of turn.blocks) {
if (block.mode === 'thinking' && typeof block.content === 'string') {
content += `<thinking>${block.content}</thinking>`;
} else if (block.mode === 'responding' && typeof block.content === 'string') {
if (content && content.length) {
content += "\n";
}
content += block.content;
}
if (turn.response) {
content += turn.response;
}
messages.push({

View File

@ -3,6 +3,17 @@
// Licensed under the Apache License, Version 2.0
import env from "../config/env.ts";
const aiEnv: IAiEnvironment = {
NODE_ENV: env.NODE_ENV,
services: {
google: {
cse: {
apiKey: env.google.cse.apiKey,
engineId: env.google.cse.engineId,
},
},
},
};
import { IAiProvider as DbAiProvider, GadgetId } from "@gadget/api";
import { GadgetService } from "../lib/service.js";
@ -128,17 +139,6 @@ class AiService extends GadgetService {
}
getApi(provider: AiProviderConfig) {
const aiEnv: IAiEnvironment = {
NODE_ENV: env.NODE_ENV,
services: {
google: {
cse: {
apiKey: env.google.cse.apiKey,
engineId: env.google.cse.engineId,
},
},
},
};
return createAiApi(aiEnv, provider, this.log);
}
}

View File

@ -96,7 +96,11 @@ export interface IAiChatResponse {
}
export interface IAiStreamChunk {
type: 'thinking' | 'response' | 'toolCall';
data: string;
toolCallId?: string;
toolName?: string;
params?: string;
}
export type IAiResponseStreamFn = (chunk: IAiStreamChunk) => Promise<void>;

View File

@ -165,6 +165,21 @@ export class OllamaAiApi extends AiApi {
for await (const chunk of response) {
await this.log.debug("stream chunk received", { chunk });
lastChunk = chunk;
if (streamCallback) {
if (chunk.thinking) {
await streamCallback({
type: 'thinking',
data: chunk.thinking,
});
}
if (chunk.response) {
await streamCallback({
type: 'response',
data: chunk.response,
});
}
}
}
assert(lastChunk, "no stream response chunks received");
@ -231,9 +246,41 @@ export class OllamaAiApi extends AiApi {
});
let lastChunk;
let accumulatedThinking = "";
let accumulatedResponse = "";
for await (const chunk of response) {
await this.log.debug("stream chunk received", { chunk });
lastChunk = chunk;
if (streamCallback) {
if (chunk.message.thinking) {
accumulatedThinking += chunk.message.thinking;
await streamCallback({
type: 'thinking',
data: chunk.message.thinking,
});
}
if (chunk.message.content) {
accumulatedResponse += chunk.message.content;
await streamCallback({
type: 'response',
data: chunk.message.content,
});
}
if (chunk.message.tool_calls) {
for (const tc of chunk.message.tool_calls) {
const params = JSON.stringify(tc.function.arguments);
await streamCallback({
type: 'toolCall',
data: params,
toolCallId: `tool_${tc.function.name}_${Date.now()}`,
toolName: tc.function.name,
params,
});
}
}
}
}
assert(lastChunk, "no response chunks received");

View File

@ -201,25 +201,51 @@ export class OpenAiApi extends AiApi {
: []),
{ role: "user" as const, content: options.prompt },
],
stream: false,
stream: true,
});
const choice = response.choices[0];
let accumulatedResponse = "";
let accumulatedThinking = "";
for await (const chunk of response) {
const delta = chunk.choices[0]?.delta;
if (delta) {
if (delta.content) {
accumulatedResponse += delta.content;
if (streamCallback) {
await streamCallback({
type: 'response',
data: delta.content,
});
}
}
if ('reasoning' in delta && delta.reasoning) {
accumulatedThinking += delta.reasoning as string;
if (streamCallback) {
await streamCallback({
type: 'thinking',
data: delta.reasoning as string,
});
}
}
}
}
const endTime = Date.now();
const durationMs = endTime - startTime;
return {
done: true,
response: choice.message.content || "",
thinking: undefined,
response: accumulatedResponse,
thinking: accumulatedThinking || undefined,
stats: {
duration: {
seconds: durationMs / 1000,
text: numeral(durationMs / 1000).format("hh:mm:ss"),
},
tokenCounts: {
input: response.usage?.prompt_tokens || 0,
response: response.usage?.completion_tokens || 0,
input: 0,
response: 0,
thinking: 0,
},
},
@ -279,16 +305,56 @@ export class OpenAiApi extends AiApi {
model: model.modelId,
messages,
tools,
stream: false,
stream: true,
});
const choice = response.choices[0];
const endTime = Date.now();
const durationMs = endTime - startTime;
let accumulatedResponse = "";
let accumulatedThinking = "";
let finalToolCalls: any = undefined;
const toolCalls = choice.message.tool_calls
?.filter((tc) => tc.type === "function")
.map((tc) => ({
for await (const chunk of response) {
const delta = chunk.choices[0]?.delta;
if (delta) {
if (delta.content) {
accumulatedResponse += delta.content;
if (streamCallback) {
await streamCallback({
type: 'response',
data: delta.content,
});
}
}
if ('reasoning' in delta && delta.reasoning) {
accumulatedThinking += delta.reasoning as string;
if (streamCallback) {
await streamCallback({
type: 'thinking',
data: delta.reasoning as string,
});
}
}
if (delta.tool_calls) {
finalToolCalls = delta.tool_calls;
for (const tc of delta.tool_calls) {
if (tc.function) {
if (streamCallback) {
await streamCallback({
type: 'toolCall',
data: tc.function.arguments || "",
toolCallId: tc.id,
toolName: tc.function.name,
params: tc.function.arguments,
});
}
}
}
}
}
}
const toolCalls = finalToolCalls
?.filter((tc: any) => tc.type === "function")
.map((tc: any) => ({
callId: tc.id,
function: {
name: tc.function.name,
@ -299,18 +365,18 @@ export class OpenAiApi extends AiApi {
if (!toolCalls || toolCalls.length === 0) {
return {
done: true,
response: choice.message.content || "",
thinking: undefined,
response: accumulatedResponse,
thinking: accumulatedThinking || undefined,
toolCalls: undefined,
toolCallResults: allToolCallResults.length > 0 ? allToolCallResults : undefined,
stats: {
duration: {
seconds: durationMs / 1000,
text: numeral(durationMs / 1000).format("hh:mm:ss"),
seconds: (Date.now() - startTime) / 1000,
text: numeral((Date.now() - startTime) / 1000).format("hh:mm:ss"),
},
tokenCounts: {
input: response.usage?.prompt_tokens || 0,
response: response.usage?.completion_tokens || 0,
input: 0,
response: 0,
thinking: 0,
},
},
@ -325,10 +391,10 @@ export class OpenAiApi extends AiApi {
const assistantMsg: ChatCompletionAssistantMessageParam = {
role: "assistant",
content: choice.message.content,
content: accumulatedResponse,
};
if (choice.message.tool_calls) {
assistantMsg.tool_calls = choice.message.tool_calls;
if (finalToolCalls) {
assistantMsg.tool_calls = finalToolCalls;
}
messages.push(assistantMsg);

View File

@ -23,6 +23,26 @@ export interface IChatTurnPrompts {
system?: string;
}
export interface IChatTurnBlockThinking {
mode: 'thinking';
createdAt: Date;
content: string;
}
export interface IChatTurnBlockResponding {
mode: 'responding';
createdAt: Date;
content: string;
}
export interface IChatTurnBlockTool {
mode: 'tool';
createdAt: Date;
content: IChatToolCall;
}
export type IChatTurnBlock = IChatTurnBlockThinking | IChatTurnBlockResponding | IChatTurnBlockTool;
export interface IChatTurnStats {
toolCallCount: number; // total number of tool functions called this turn
inputTokens: number; // total number of input tokens processed this turn
@ -60,11 +80,10 @@ export interface IChatTurn {
session: IChatSession | GadgetId;
provider: IAiProvider | GadgetId;
llm: string; // id/name of the model used to process the prompt
mode: ChatSessionMode; // session mode for this turn/prompt
mode: ChatSessionMode;
status: ChatTurnStatus;
prompts: IChatTurnPrompts;
thinking?: string;
response?: string;
blocks: IChatTurnBlock[];
errorMessage?: string;
toolCalls: IChatToolCall[];
subagents: IChatSubagentProcess[]; // subagents used while processing this turn