480 lines
17 KiB
Markdown
480 lines
17 KiB
Markdown
# Gadget Code Streaming Responses
|
|
|
|
**Status:** ✅ **IMPLEMENTED** — Full end-to-end streaming responses operational
|
|
**Last Updated:** May 7, 2026
|
|
|
|
## Quick Reference
|
|
|
|
**Streaming Path:** AI Provider → @gadget/ai → gadget-drone → gadget-code (backend) → Frontend IDE
|
|
|
|
**Key Concepts:**
|
|
- **IAiStreamChunk**: Unified chunk type with `type: 'thinking'|'response'|'toolCall'`
|
|
- **Backend Aggregation**: Tokens buffered in `DroneSession`, persisted at mode changes
|
|
- **Frontend In-Place Updates**: Blocks updated by index, not appended (prevents DOM flooding)
|
|
- **Blocks Array**: `ChatTurn.blocks[]` stores ordered thinking/responding/tool blocks
|
|
|
|
**Critical Files:**
|
|
- `packages/ai/src/api.ts` — Stream chunk interface
|
|
- `gadget-code/src/lib/drone-session.ts` — Aggregation logic
|
|
- `gadget-code/frontend/src/pages/ChatSessionView.tsx` — Frontend state management
|
|
- `gadget-code/frontend/src/components/ChatTurn.tsx` — Block rendering
|
|
|
|
## Overview
|
|
|
|
Gadget Code implements real-time streaming responses from AI providers (OpenAI, Ollama) through the entire system stack. As the AI model generates tokens, they flow immediately from the provider → drone → backend → frontend, where they are displayed to the user with minimal latency.
|
|
|
|
The system supports three types of streaming content:
|
|
- **Thinking tokens**: Reasoning/output from models with thinking capabilities
|
|
- **Response tokens**: The model's primary response content
|
|
- **Tool calls**: Function/tool invocations requested by the model
|
|
|
|
## Architecture & Data Flow
|
|
|
|
### Complete Streaming Path
|
|
|
|
```
|
|
┌─────────────────┐
|
|
│ AI Provider │ (OpenAI / Ollama SDK)
|
|
│ (Streaming) │
|
|
└────────┬────────┘
|
|
│ IAiStreamChunk
|
|
│ (type: 'thinking'|'response'|'toolCall')
|
|
▼
|
|
┌─────────────────┐
|
|
│ @gadget/ai │ (packages/ai/src/openai.ts or ollama.ts)
|
|
│ streamCallback │
|
|
└────────┬────────┘
|
|
│ Calls streamCallback(chunk) for each token
|
|
▼
|
|
┌─────────────────┐
|
|
│ gadget-drone │ (gadget-drone/src/services/agent.ts)
|
|
│ AgentService │
|
|
└────────┬────────┘
|
|
│ Socket.IO events
|
|
│ thinking(content)
|
|
│ response(content)
|
|
│ toolCall(callId, name, params, response)
|
|
▼
|
|
┌─────────────────┐
|
|
│ gadget-code │ (gadget-code/src/lib/drone-session.ts)
|
|
│ DroneSession │
|
|
└────────┬────────┘
|
|
│ Aggregates tokens by mode
|
|
│ Persists to MongoDB at mode changes
|
|
│ Routes events to CodeSession
|
|
▼
|
|
┌─────────────────┐
|
|
│ gadget-code │ (gadget-code/src/lib/code-session.ts)
|
|
│ CodeSession │
|
|
└────────┬────────┘
|
|
│ Socket.IO events to IDE
|
|
▼
|
|
┌─────────────────┐
|
|
│ Frontend IDE │ (gadget-code/frontend/src/pages/ChatSessionView.tsx)
|
|
│ Chat Turn │
|
|
└─────────────────┘
|
|
```
|
|
|
|
### Event Flow Details
|
|
|
|
1. **AI Provider → @gadget/ai**
|
|
- OpenAI: Uses `stream: true` in `chat.completions.create()`, iterates SSE chunks
|
|
- Ollama: Uses `stream: true` in `client.chat()`, iterates async chunks
|
|
- Both call `streamCallback(chunk: IAiStreamChunk)` for each token
|
|
|
|
2. **@gadget/ai → gadget-drone**
|
|
- `streamCallback` emits Socket.IO events based on chunk type:
|
|
```typescript
|
|
switch (chunk.type) {
|
|
case 'thinking': socket.emit("thinking", chunk.data); break;
|
|
case 'response': socket.emit("response", chunk.data); break;
|
|
case 'toolCall': socket.emit("toolCall", callId, name, params, data); break;
|
|
}
|
|
```
|
|
|
|
3. **gadget-drone → gadget-code:backend**
|
|
- Events arrive at `DroneSession` via Socket.IO
|
|
- `DroneSession` aggregates tokens in memory (see Aggregation below)
|
|
- At mode changes or tool calls, flushes to MongoDB
|
|
- Routes events to corresponding `CodeSession` via `SocketService.getCodeSessionByChatSessionId()`
|
|
|
|
4. **gadget-code:backend → Frontend IDE**
|
|
- `CodeSession` forwards events to IDE socket
|
|
- `ChatSessionView` receives events and updates React state
|
|
- `ChatTurn` component renders blocks with Markdown
|
|
|
|
## IAiStreamChunk Interface
|
|
|
|
Defined in `packages/ai/src/api.ts`:
|
|
|
|
```typescript
|
|
interface IAiStreamChunk {
|
|
type: 'thinking' | 'response' | 'toolCall';
|
|
data: string;
|
|
toolCallId?: string;
|
|
toolName?: string;
|
|
params?: string;
|
|
}
|
|
|
|
type IAiResponseStreamFn = (chunk: IAiStreamChunk) => Promise<void>;
|
|
```
|
|
|
|
The `type` field determines how the chunk is processed and displayed. The `data` field contains the token content. For tool calls, `toolCallId`, `toolName`, and `params` provide metadata.
|
|
|
|
## Aggregation & Persistence
|
|
|
|
### Where Aggregation Lives
|
|
|
|
**Location:** `gadget-code/src/lib/drone-session.ts`
|
|
**Class:** `DroneSession`
|
|
**Private Field:** `streamingBuffers: Map<string, IStreamingBuffer>`
|
|
|
|
### IStreamingBuffer Interface
|
|
|
|
```typescript
|
|
interface IStreamingBuffer {
|
|
currentMode: 'thinking' | 'responding' | null;
|
|
thinkingContent: string;
|
|
respondingContent: string;
|
|
lastBlockCreatedAt?: Date;
|
|
}
|
|
```
|
|
|
|
### How Aggregation Works
|
|
|
|
1. **Token Arrival**: When `onThinking()` or `onResponse()` is called:
|
|
- Get or create buffer for the current `turnId`
|
|
- Check if `currentMode` matches the incoming token type
|
|
- If mode changed, flush previous buffer to database first
|
|
- Append token content to appropriate field (`thinkingContent` or `respondingContent`)
|
|
|
|
2. **Mode Transition Detection**:
|
|
- `thinking` event while in `responding` mode → flush responding, start thinking
|
|
- `response` event while in `thinking` mode → flush thinking, start responding
|
|
- Tool call events → flush current buffer, add tool block immediately
|
|
|
|
3. **Database Persistence**:
|
|
- Flushes occur at mode transitions (not on every token)
|
|
- `ChatTurn.blocks` array is updated with aggregated content
|
|
- Tool calls are persisted immediately as separate blocks
|
|
|
|
4. **Work Order Completion**:
|
|
- `onWorkOrderComplete()` flushes any remaining buffered content
|
|
- Ensures no tokens are lost if streaming ends abruptly
|
|
|
|
### Example Flow
|
|
|
|
```
|
|
Token Stream: [think: "Hmm"] [think: " let"] [think: " me"] [resp: "Sure"] [tool: search_google] [resp: " I'll"]
|
|
|
|
Buffer State:
|
|
1. After "Hmm": { mode: 'thinking', thinking: "Hmm" }
|
|
2. After " let": { mode: 'thinking', thinking: "Hmm let" }
|
|
3. After " me": { mode: 'thinking', thinking: "Hmm let me" }
|
|
4. After "Sure": FLUSH thinking → DB, { mode: 'responding', responding: "Sure" }
|
|
5. After tool call: FLUSH responding → DB, ADD tool block → DB, { mode: null }
|
|
6. After " I'll": { mode: 'responding', responding: " I'll" }
|
|
```
|
|
|
|
## ChatTurn Data Model
|
|
|
|
### IChatTurnBlock Interface
|
|
|
|
Defined in `packages/api/src/interfaces/chat-turn.ts`:
|
|
|
|
```typescript
|
|
interface IChatTurnBlockThinking {
|
|
mode: 'thinking';
|
|
createdAt: Date;
|
|
content: string;
|
|
}
|
|
|
|
interface IChatTurnBlockResponding {
|
|
mode: 'responding';
|
|
createdAt: Date;
|
|
content: string;
|
|
}
|
|
|
|
interface IChatTurnBlockTool {
|
|
mode: 'tool';
|
|
createdAt: Date;
|
|
content: IChatToolCall; // { callId, name, parameters, response }
|
|
}
|
|
|
|
type IChatTurnBlock = IChatTurnBlockThinking | IChatTurnBlockResponding | IChatTurnBlockTool;
|
|
```
|
|
|
|
### IChatTurn Changes
|
|
|
|
The `IChatTurn` interface now uses a `blocks` array instead of flat `thinking` and `response` strings:
|
|
|
|
```typescript
|
|
interface IChatTurn {
|
|
// ... other fields ...
|
|
blocks: IChatTurnBlock[]; // NEW: ordered blocks of thinking/responding/tool
|
|
toolCalls: IChatToolCall[]; // Still maintained for detailed tool call data
|
|
// ... other fields ...
|
|
}
|
|
```
|
|
|
|
**Removed:** `thinking?: string` and `response?: string` fields (replaced by blocks)
|
|
|
|
### Mongoose Schema
|
|
|
|
In `gadget-code/src/models/chat-turn.ts`:
|
|
|
|
```typescript
|
|
const ChatTurnBlockSchema = new Schema<IChatTurnBlock>({
|
|
mode: {
|
|
type: String,
|
|
enum: ['thinking', 'responding', 'tool'],
|
|
required: true
|
|
},
|
|
createdAt: { type: Date, default: Date.now, required: true },
|
|
content: { type: Schema.Types.Mixed, required: true },
|
|
}, { _id: false });
|
|
|
|
ChatTurnSchema = new Schema({
|
|
// ...
|
|
blocks: { type: [ChatTurnBlockSchema], default: [], required: true },
|
|
// ...
|
|
});
|
|
```
|
|
|
|
## Frontend Event Handling & Rendering
|
|
|
|
### Event Reception
|
|
|
|
**Location:** `gadget-code/frontend/src/pages/ChatSessionView.tsx`
|
|
|
|
The `ChatSessionView` component maintains streaming state:
|
|
|
|
```typescript
|
|
interface StreamingState {
|
|
currentMode: 'thinking' | 'responding' | null;
|
|
thinkingContent: string;
|
|
respondingContent: string;
|
|
currentBlockIndex: number | null; // Tracks which block is being updated
|
|
}
|
|
```
|
|
|
|
### Event Handlers
|
|
|
|
1. **handleThinking(content: string)**:
|
|
- If mode changed from responding → thinking, flush responding block
|
|
- Aggregate thinking content
|
|
- Update current block in place (or create new if mode transition)
|
|
|
|
2. **handleResponse(content: string)**:
|
|
- If mode changed from thinking → responding, flush thinking block
|
|
- Aggregate response content
|
|
- Update current block in place (or create new if mode transition)
|
|
|
|
3. **handleToolCall(callId, name, params, response)**:
|
|
- Flush any current streaming buffer
|
|
- Add tool block immediately (no aggregation for tools)
|
|
- Reset streaming state
|
|
|
|
4. **handleWorkOrderComplete()**:
|
|
- Flush any remaining buffered content
|
|
- Clean up streaming state for this turn
|
|
|
|
### In-Place Block Updates
|
|
|
|
The key optimization: **blocks are updated in place during streaming, not appended**.
|
|
|
|
```typescript
|
|
// In scheduleUpdate():
|
|
if (currentBlockIndex !== null && oldTurn.blocks[currentBlockIndex]) {
|
|
// Same mode → update existing block
|
|
if (oldBlocks[currentBlockIndex].mode === updateBlock.mode) {
|
|
oldBlocks[currentBlockIndex] = updateBlock; // In-place update
|
|
newTurn.blocks = oldBlocks;
|
|
} else {
|
|
// Mode changed → append new block
|
|
newTurn.blocks = [...oldTurn.blocks, ...turnUpdates.blocks];
|
|
state.currentBlockIndex = oldTurn.blocks.length;
|
|
}
|
|
}
|
|
```
|
|
|
|
This prevents the DOM from being flooded with duplicate blocks on each token.
|
|
|
|
### ChatTurn Component Rendering
|
|
|
|
**Location:** `gadget-code/frontend/src/components/ChatTurn.tsx`
|
|
|
|
The component renders the `blocks` array:
|
|
|
|
```typescript
|
|
{turn.blocks.map((block, idx) => {
|
|
if (block.mode === 'thinking') {
|
|
return (
|
|
<div key={idx} className="mb-3">
|
|
<div className="text-xs text-text-muted mb-1 font-mono">Thinking</div>
|
|
<div
|
|
className="p-3 bg-bg-secondary rounded text-sm text-text-muted whitespace-pre-wrap font-mono text-xs"
|
|
dangerouslySetInnerHTML={{ __html: marked.parse(block.content) }}
|
|
/>
|
|
</div>
|
|
);
|
|
} else if (block.mode === 'responding') {
|
|
return (
|
|
<div key={idx} className="mb-3">
|
|
<div
|
|
className="text-text-primary"
|
|
dangerouslySetInnerHTML={{ __html: marked.parse(block.content) }}
|
|
/>
|
|
</div>
|
|
);
|
|
} else if (block.mode === 'tool') {
|
|
const toolCall = block.content;
|
|
return (
|
|
<div key={idx} className="mb-3">
|
|
<div className="flex items-center gap-2 text-xs font-mono text-text-secondary">
|
|
<span className="text-brand">●</span>
|
|
<span>{toolCall.name}</span>
|
|
{toolCall.response && <span className="text-green-500">✓</span>}
|
|
</div>
|
|
</div>
|
|
);
|
|
}
|
|
})}
|
|
```
|
|
|
|
### Styling
|
|
|
|
- **Thinking blocks**: Muted text (`text-text-muted`), monospace font, secondary background
|
|
- **Responding blocks**: Standard primary text, Markdown rendering
|
|
- **Tool blocks**: One-line summary with ● indicator (brand color), green checkmark if response exists
|
|
|
|
### Markdown Rendering
|
|
|
|
Uses the `marked` library with `breaks: true` to honor line breaks:
|
|
|
|
```typescript
|
|
import { marked } from "marked";
|
|
|
|
marked.setOptions({
|
|
breaks: true, // Critical: renders \n as <br>
|
|
});
|
|
```
|
|
|
|
## Key Implementation Files
|
|
|
|
| Component | File Path | Responsibility |
|
|
|-----------|-----------|----------------|
|
|
| **AI Interface** | `packages/ai/src/api.ts` | `IAiStreamChunk`, `IAiResponseStreamFn` types |
|
|
| **OpenAI Provider** | `packages/ai/src/openai.ts` | Streaming from OpenAI SDK, calls `streamCallback` |
|
|
| **Ollama Provider** | `packages/ai/src/ollama.ts` | Streaming from Ollama SDK, calls `streamCallback` |
|
|
| **Drone Agent** | `gadget-drone/src/services/agent.ts` | Routes stream chunks to Socket.IO events |
|
|
| **Backend Aggregation** | `gadget-code/src/lib/drone-session.ts` | Buffers tokens, persists at mode changes |
|
|
| **Backend Routing** | `gadget-code/src/lib/code-session.ts` | Forwards events to IDE socket |
|
|
| **Frontend State** | `gadget-code/frontend/src/pages/ChatSessionView.tsx` | Manages streaming state, in-place updates |
|
|
| **Frontend Rendering** | `gadget-code/frontend/src/components/ChatTurn.tsx` | Renders blocks with Markdown |
|
|
| **Data Model** | `packages/api/src/interfaces/chat-turn.ts` | `IChatTurnBlock` types |
|
|
| **Mongoose Schema** | `gadget-code/src/models/chat-turn.ts` | MongoDB persistence schema |
|
|
|
|
## Design Decisions
|
|
|
|
### Why Aggregate in Backend?
|
|
|
|
1. **Database Efficiency**: Writing to MongoDB on every token would overwhelm the database
|
|
2. **Network Efficiency**: Fewer, larger updates instead of thousands of micro-updates
|
|
3. **Mode Awareness**: Backend can detect mode transitions and structure data appropriately
|
|
|
|
### Why In-Place Updates in Frontend?
|
|
|
|
1. **Performance**: Updating existing DOM nodes is cheaper than creating new ones
|
|
2. **Memory**: Prevents accumulation of hundreds of nearly-identical block objects
|
|
3. **Correctness**: Ensures the UI reflects the actual streaming state (one active block per mode)
|
|
|
|
### Why Blocks Array Instead of Strings?
|
|
|
|
1. **Temporal Ordering**: Preserves the exact sequence of thinking/responding/tool events
|
|
2. **Reconstruction**: Can replay the agent's "thought process" exactly as it happened
|
|
3. **Analytics**: Easy to query for patterns (e.g., "how many mode transitions per turn?")
|
|
4. **Query Efficiency**: No need for complex MongoDB aggregations to reconstruct turns
|
|
|
|
## Testing & Verification
|
|
|
|
### Manual Testing Steps
|
|
|
|
1. Start backend: `cd gadget-code && pnpm dev:backend`
|
|
2. Start frontend: `cd gadget-code/frontend && pnpm dev`
|
|
3. Start drone: `cd ~/workspace && pnpm --filter gadget-drone dev`
|
|
4. Create chat session, submit prompt
|
|
5. Observe:
|
|
- Thinking content streams in (muted, monospace)
|
|
- Response content streams in (standard text)
|
|
- Tool calls appear as one-line summaries
|
|
- Mode transitions create new blocks
|
|
- Final display matches streaming sequence
|
|
|
|
### What to Look For
|
|
|
|
✅ **Correct behavior:**
|
|
- Content streams in real-time (not all at once at the end)
|
|
- Thinking blocks are visually distinct (muted, monospace)
|
|
- Tool calls break between thinking/responding blocks
|
|
- No duplicate blocks (each mode has one active block during streaming)
|
|
- Markdown renders correctly with line breaks
|
|
|
|
❌ **Incorrect behavior:**
|
|
- Hundreds of blocks with duplicate content (aggregation broken)
|
|
- All content appears at once (streaming not working)
|
|
- Thinking and response mixed in same block (mode detection broken)
|
|
- Tool calls not appearing (tool call events not routed)
|
|
|
|
## Future Enhancements
|
|
|
|
Potential improvements not yet implemented:
|
|
|
|
1. **Token Count Streaming**: Emit token counts with each chunk for real-time stats
|
|
2. **Thinking/Response Mode Labels**: Optional headers to explicitly label block types
|
|
3. **Block Collapse/Expand**: Persist collapsed state for thinking blocks
|
|
4. **Streaming Cursor**: Visual indicator (blinking cursor) at end of active streaming block
|
|
5. **Subagent Streaming**: Extend streaming to subagent processes
|
|
|
|
## Troubleshooting
|
|
|
|
### No Streaming Updates
|
|
|
|
**Symptoms:** UI shows spinner but no content until work order completes
|
|
|
|
**Causes:**
|
|
1. `streamCallback` not being called in AI provider
|
|
2. Socket.IO events not emitted from drone
|
|
3. Event handlers not registered in `DroneSession` or `CodeSession`
|
|
|
|
**Debug:** Check `gadget-drone.log` for "stream chunk received" entries
|
|
|
|
### Duplicate Blocks
|
|
|
|
**Symptoms:** UI shows many blocks with progressively longer content
|
|
|
|
**Causes:**
|
|
1. `currentBlockIndex` not being tracked in frontend
|
|
2. `scheduleUpdate` always appending instead of updating in place
|
|
3. Mode transitions not detected properly
|
|
|
|
**Debug:** Inspect `turn.blocks` array in React DevTools
|
|
|
|
### Mode Transitions Not Working
|
|
|
|
**Symptoms:** Thinking and response content mixed in same block
|
|
|
|
**Causes:**
|
|
1. Chunk `type` field not set correctly in AI provider
|
|
2. Mode detection logic in event handlers broken
|
|
3. Buffer flush not occurring on mode change
|
|
|
|
**Debug:** Log `state.currentMode` in event handlers
|
|
|
|
## Related Documentation
|
|
|
|
- [Architecture](./architecture.md) — Overall system architecture
|
|
- [Socket Protocol](./socket-protocol.md) — Socket.IO event definitions
|
|
- [ChatTurn Interface](../packages/api/src/interfaces/chat-turn.ts) — TypeScript types
|
|
- [ChatTurn Model](../gadget-code/src/models/chat-turn.ts) — Mongoose schema
|