stream response handling and correctness

This commit is contained in:
Rob Colbert 2026-05-09 11:51:09 -04:00
parent d26624ab93
commit 931359b674
2 changed files with 76 additions and 41 deletions

View File

@ -1,7 +1,7 @@
import { useState, useEffect, useRef, useContext, useCallback } from 'react';
import { useParams, useNavigate } from 'react-router-dom';
import { socketClient } from '../lib/socket';
import { chatSessionApi, projectApi, providerApi, type ChatSession, type ChatTurn, ChatSessionMode, type AiProvider } from '../lib/api';
import { chatSessionApi, projectApi, providerApi, type ChatSession, type ChatTurn, type ChatTurnBlock, ChatSessionMode, type AiProvider, type Project } from '../lib/api';
import { WorkspaceMode } from '../lib/types';
import WorkspaceModeIndicator from '../components/WorkspaceModeIndicator';
import FilesPanel from '../components/FilesPanel';
@ -238,31 +238,34 @@ export default function ChatSessionView() {
if (turnUpdates.blocks !== undefined) {
const state = streamingStateRef.current.get(turnId);
const currentBlockIndex = state?.currentBlockIndex ?? null;
const updatedBlocks = [...(oldTurn.blocks || [])];
// If we have a current block index, update it in place
if (currentBlockIndex !== null && oldTurn.blocks && oldTurn.blocks[currentBlockIndex]) {
const oldBlocks = [...oldTurn.blocks];
const updateBlock = turnUpdates.blocks[0];
for (const updateBlock of turnUpdates.blocks) {
let blockIndex = state?.currentBlockIndex ?? null;
// Only update if the mode matches
if (oldBlocks[currentBlockIndex].mode === updateBlock.mode) {
oldBlocks[currentBlockIndex] = updateBlock;
newTurn.blocks = oldBlocks;
} else {
// Mode changed, append new block and update index
newTurn.blocks = [...oldTurn.blocks, ...turnUpdates.blocks];
if (state) {
state.currentBlockIndex = newTurn.blocks.length - 1;
}
if (
blockIndex === null ||
updatedBlocks[blockIndex]?.mode !== updateBlock.mode
) {
const lastIndex = updatedBlocks.length - 1;
blockIndex = updatedBlocks[lastIndex]?.mode === updateBlock.mode
? lastIndex
: null;
}
} else {
// No current block, append and set index
newTurn.blocks = [...(oldTurn.blocks || []), ...turnUpdates.blocks];
if (state && turnUpdates.blocks.length > 0) {
state.currentBlockIndex = newTurn.blocks.length - 1;
if (blockIndex !== null) {
updatedBlocks[blockIndex] = updateBlock;
} else {
updatedBlocks.push(updateBlock);
blockIndex = updatedBlocks.length - 1;
}
if (state) {
state.currentBlockIndex = blockIndex;
}
}
newTurn.blocks = updatedBlocks;
}
if (turnUpdates.toolCalls !== undefined) {
newTurn.toolCalls = [...(oldTurn.toolCalls || []), ...turnUpdates.toolCalls];
@ -287,6 +290,30 @@ export default function ChatSessionView() {
});
}, []);
const mergePendingUpdate = useCallback((turnId: string, updates: Partial<ChatTurn>) => {
const existing = pendingUpdatesRef.current.get(turnId);
const merged: Partial<ChatTurn> = {
...existing,
...updates,
};
if (existing?.blocks || updates.blocks) {
merged.blocks = [
...(existing?.blocks || []),
...(updates.blocks || []),
];
}
if (existing?.toolCalls || updates.toolCalls) {
merged.toolCalls = [
...(existing?.toolCalls || []),
...(updates.toolCalls || []),
];
}
pendingUpdatesRef.current.set(turnId, merged);
}, []);
const handleThinking = useCallback((content: string) => {
const turnId = currentTurnIdRef.current;
if (!turnId) return;
@ -301,7 +328,7 @@ export default function ChatSessionView() {
if (state.currentMode !== 'thinking') {
// Flush previous mode
if (state.currentMode === 'responding' && state.respondingContent) {
pendingUpdatesRef.current.set(turnId, {
mergePendingUpdate(turnId, {
blocks: [{
mode: 'responding' as const,
createdAt: new Date().toISOString(),
@ -318,7 +345,7 @@ export default function ChatSessionView() {
state.thinkingContent += content;
// Update with aggregated content
pendingUpdatesRef.current.set(turnId, {
mergePendingUpdate(turnId, {
blocks: [{
mode: 'thinking' as const,
createdAt: new Date().toISOString(),
@ -326,7 +353,7 @@ export default function ChatSessionView() {
}],
});
scheduleUpdate();
}, [scheduleUpdate]);
}, [mergePendingUpdate, scheduleUpdate]);
const handleResponse = useCallback((content: string) => {
const turnId = currentTurnIdRef.current;
@ -342,7 +369,7 @@ export default function ChatSessionView() {
if (state.currentMode !== 'responding') {
// Flush previous mode
if (state.currentMode === 'thinking' && state.thinkingContent) {
pendingUpdatesRef.current.set(turnId, {
mergePendingUpdate(turnId, {
blocks: [{
mode: 'thinking' as const,
createdAt: new Date().toISOString(),
@ -359,7 +386,7 @@ export default function ChatSessionView() {
state.respondingContent += content;
// Update with aggregated content
pendingUpdatesRef.current.set(turnId, {
mergePendingUpdate(turnId, {
blocks: [{
mode: 'responding' as const,
createdAt: new Date().toISOString(),
@ -367,7 +394,7 @@ export default function ChatSessionView() {
}],
});
scheduleUpdate();
}, [scheduleUpdate]);
}, [mergePendingUpdate, scheduleUpdate]);
const handleToolCall = useCallback((callId: string, name: string, params: string, response: string) => {
const turnId = currentTurnIdRef.current;
@ -396,7 +423,7 @@ export default function ChatSessionView() {
}
if (blocksToFlush.length > 0) {
pendingUpdatesRef.current.set(turnId, {
mergePendingUpdate(turnId, {
blocks: blocksToFlush,
});
scheduleUpdate();
@ -407,7 +434,7 @@ export default function ChatSessionView() {
}
// Add tool block
pendingUpdatesRef.current.set(turnId, {
mergePendingUpdate(turnId, {
blocks: [{
mode: 'tool' as const,
createdAt: new Date().toISOString(),
@ -416,14 +443,17 @@ export default function ChatSessionView() {
toolCalls: [{ callId, name, parameters: params, response }],
});
scheduleUpdate();
}, [scheduleUpdate]);
}, [mergePendingUpdate, scheduleUpdate]);
const handleWorkOrderComplete = useCallback((turnId: string, success: boolean, message?: string) => {
// Backend has already flushed and persisted all streaming content
// Just clean up frontend streaming state and update status
const state = streamingStateRef.current.get(turnId);
if (state) {
streamingStateRef.current.delete(turnId);
if (streamingStateRef.current.has(turnId)) {
if (pendingUpdatesRef.current.has(turnId) || updateRafRef.current) {
requestAnimationFrame(() => streamingStateRef.current.delete(turnId));
} else {
streamingStateRef.current.delete(turnId);
}
}
setTurns(prevTurns =>

View File

@ -88,18 +88,24 @@ class AgentService extends GadgetService {
chatOptions: {},
context: [],
};
let streamedThinking = false;
let streamedResponse = false;
let streamedToolCall = false;
const onStreamChunk = async (chunk: IAiStreamChunk): Promise<void> => {
// this.log.debug("stream chunk received", { chunk });
switch (chunk.type) {
case "thinking":
streamedThinking = true;
socket.emit("thinking", chunk.data);
break;
case "response":
streamedResponse = true;
socket.emit("response", chunk.data);
break;
case "toolCall":
streamedToolCall = true;
socket.emit(
"toolCall",
chunk.toolCallId!,
@ -160,18 +166,17 @@ class AgentService extends GadgetService {
throw new Error("Model failed to respond (still loading or error)");
}
// Emit thinking content if present
if (response.thinking) {
// Providers return accumulated final content; only emit it here when it
// was not already delivered through the stream callback.
if (response.thinking && !streamedThinking) {
socket.emit("thinking", response.thinking);
}
// Emit response content if present
if (response.response) {
if (response.response && !streamedResponse) {
socket.emit("response", response.response);
}
// Emit tool calls if present
if (response.toolCalls && response.toolCalls.length > 0) {
if (response.toolCalls && response.toolCalls.length > 0 && !streamedToolCall) {
for (const toolCall of response.toolCalls) {
socket.emit(
"toolCall",