diff --git a/gadget-code/frontend/src/pages/ChatSessionView.tsx b/gadget-code/frontend/src/pages/ChatSessionView.tsx index 5e7182b..10026f7 100644 --- a/gadget-code/frontend/src/pages/ChatSessionView.tsx +++ b/gadget-code/frontend/src/pages/ChatSessionView.tsx @@ -1,7 +1,7 @@ import { useState, useEffect, useRef, useContext, useCallback } from 'react'; import { useParams, useNavigate } from 'react-router-dom'; import { socketClient } from '../lib/socket'; -import { chatSessionApi, projectApi, providerApi, type ChatSession, type ChatTurn, ChatSessionMode, type AiProvider } from '../lib/api'; +import { chatSessionApi, projectApi, providerApi, type ChatSession, type ChatTurn, type ChatTurnBlock, ChatSessionMode, type AiProvider, type Project } from '../lib/api'; import { WorkspaceMode } from '../lib/types'; import WorkspaceModeIndicator from '../components/WorkspaceModeIndicator'; import FilesPanel from '../components/FilesPanel'; @@ -238,31 +238,34 @@ export default function ChatSessionView() { if (turnUpdates.blocks !== undefined) { const state = streamingStateRef.current.get(turnId); - const currentBlockIndex = state?.currentBlockIndex ?? null; - - // If we have a current block index, update it in place - if (currentBlockIndex !== null && oldTurn.blocks && oldTurn.blocks[currentBlockIndex]) { - const oldBlocks = [...oldTurn.blocks]; - const updateBlock = turnUpdates.blocks[0]; - - // Only update if the mode matches - if (oldBlocks[currentBlockIndex].mode === updateBlock.mode) { - oldBlocks[currentBlockIndex] = updateBlock; - newTurn.blocks = oldBlocks; - } else { - // Mode changed, append new block and update index - newTurn.blocks = [...oldTurn.blocks, ...turnUpdates.blocks]; - if (state) { - state.currentBlockIndex = newTurn.blocks.length - 1; - } + const updatedBlocks = [...(oldTurn.blocks || [])]; + + for (const updateBlock of turnUpdates.blocks) { + let blockIndex = state?.currentBlockIndex ?? null; + + if ( + blockIndex === null || + updatedBlocks[blockIndex]?.mode !== updateBlock.mode + ) { + const lastIndex = updatedBlocks.length - 1; + blockIndex = updatedBlocks[lastIndex]?.mode === updateBlock.mode + ? lastIndex + : null; } - } else { - // No current block, append and set index - newTurn.blocks = [...(oldTurn.blocks || []), ...turnUpdates.blocks]; - if (state && turnUpdates.blocks.length > 0) { - state.currentBlockIndex = newTurn.blocks.length - 1; + + if (blockIndex !== null) { + updatedBlocks[blockIndex] = updateBlock; + } else { + updatedBlocks.push(updateBlock); + blockIndex = updatedBlocks.length - 1; + } + + if (state) { + state.currentBlockIndex = blockIndex; } } + + newTurn.blocks = updatedBlocks; } if (turnUpdates.toolCalls !== undefined) { newTurn.toolCalls = [...(oldTurn.toolCalls || []), ...turnUpdates.toolCalls]; @@ -287,6 +290,30 @@ export default function ChatSessionView() { }); }, []); + const mergePendingUpdate = useCallback((turnId: string, updates: Partial) => { + const existing = pendingUpdatesRef.current.get(turnId); + const merged: Partial = { + ...existing, + ...updates, + }; + + if (existing?.blocks || updates.blocks) { + merged.blocks = [ + ...(existing?.blocks || []), + ...(updates.blocks || []), + ]; + } + + if (existing?.toolCalls || updates.toolCalls) { + merged.toolCalls = [ + ...(existing?.toolCalls || []), + ...(updates.toolCalls || []), + ]; + } + + pendingUpdatesRef.current.set(turnId, merged); + }, []); + const handleThinking = useCallback((content: string) => { const turnId = currentTurnIdRef.current; if (!turnId) return; @@ -301,7 +328,7 @@ export default function ChatSessionView() { if (state.currentMode !== 'thinking') { // Flush previous mode if (state.currentMode === 'responding' && state.respondingContent) { - pendingUpdatesRef.current.set(turnId, { + mergePendingUpdate(turnId, { blocks: [{ mode: 'responding' as const, createdAt: new Date().toISOString(), @@ -318,7 +345,7 @@ export default function ChatSessionView() { state.thinkingContent += content; // Update with aggregated content - pendingUpdatesRef.current.set(turnId, { + mergePendingUpdate(turnId, { blocks: [{ mode: 'thinking' as const, createdAt: new Date().toISOString(), @@ -326,7 +353,7 @@ export default function ChatSessionView() { }], }); scheduleUpdate(); - }, [scheduleUpdate]); + }, [mergePendingUpdate, scheduleUpdate]); const handleResponse = useCallback((content: string) => { const turnId = currentTurnIdRef.current; @@ -342,7 +369,7 @@ export default function ChatSessionView() { if (state.currentMode !== 'responding') { // Flush previous mode if (state.currentMode === 'thinking' && state.thinkingContent) { - pendingUpdatesRef.current.set(turnId, { + mergePendingUpdate(turnId, { blocks: [{ mode: 'thinking' as const, createdAt: new Date().toISOString(), @@ -359,7 +386,7 @@ export default function ChatSessionView() { state.respondingContent += content; // Update with aggregated content - pendingUpdatesRef.current.set(turnId, { + mergePendingUpdate(turnId, { blocks: [{ mode: 'responding' as const, createdAt: new Date().toISOString(), @@ -367,7 +394,7 @@ export default function ChatSessionView() { }], }); scheduleUpdate(); - }, [scheduleUpdate]); + }, [mergePendingUpdate, scheduleUpdate]); const handleToolCall = useCallback((callId: string, name: string, params: string, response: string) => { const turnId = currentTurnIdRef.current; @@ -396,7 +423,7 @@ export default function ChatSessionView() { } if (blocksToFlush.length > 0) { - pendingUpdatesRef.current.set(turnId, { + mergePendingUpdate(turnId, { blocks: blocksToFlush, }); scheduleUpdate(); @@ -407,7 +434,7 @@ export default function ChatSessionView() { } // Add tool block - pendingUpdatesRef.current.set(turnId, { + mergePendingUpdate(turnId, { blocks: [{ mode: 'tool' as const, createdAt: new Date().toISOString(), @@ -416,14 +443,17 @@ export default function ChatSessionView() { toolCalls: [{ callId, name, parameters: params, response }], }); scheduleUpdate(); - }, [scheduleUpdate]); + }, [mergePendingUpdate, scheduleUpdate]); const handleWorkOrderComplete = useCallback((turnId: string, success: boolean, message?: string) => { // Backend has already flushed and persisted all streaming content // Just clean up frontend streaming state and update status - const state = streamingStateRef.current.get(turnId); - if (state) { - streamingStateRef.current.delete(turnId); + if (streamingStateRef.current.has(turnId)) { + if (pendingUpdatesRef.current.has(turnId) || updateRafRef.current) { + requestAnimationFrame(() => streamingStateRef.current.delete(turnId)); + } else { + streamingStateRef.current.delete(turnId); + } } setTurns(prevTurns => diff --git a/gadget-drone/src/services/agent.ts b/gadget-drone/src/services/agent.ts index cb28273..d1ba605 100644 --- a/gadget-drone/src/services/agent.ts +++ b/gadget-drone/src/services/agent.ts @@ -88,18 +88,24 @@ class AgentService extends GadgetService { chatOptions: {}, context: [], }; + let streamedThinking = false; + let streamedResponse = false; + let streamedToolCall = false; const onStreamChunk = async (chunk: IAiStreamChunk): Promise => { // this.log.debug("stream chunk received", { chunk }); switch (chunk.type) { case "thinking": + streamedThinking = true; socket.emit("thinking", chunk.data); break; case "response": + streamedResponse = true; socket.emit("response", chunk.data); break; case "toolCall": + streamedToolCall = true; socket.emit( "toolCall", chunk.toolCallId!, @@ -160,18 +166,17 @@ class AgentService extends GadgetService { throw new Error("Model failed to respond (still loading or error)"); } - // Emit thinking content if present - if (response.thinking) { + // Providers return accumulated final content; only emit it here when it + // was not already delivered through the stream callback. + if (response.thinking && !streamedThinking) { socket.emit("thinking", response.thinking); } - // Emit response content if present - if (response.response) { + if (response.response && !streamedResponse) { socket.emit("response", response.response); } - // Emit tool calls if present - if (response.toolCalls && response.toolCalls.length > 0) { + if (response.toolCalls && response.toolCalls.length > 0 && !streamedToolCall) { for (const toolCall of response.toolCalls) { socket.emit( "toolCall",