From a873b6ad46396d85fada1917fd942bfa73749940 Mon Sep 17 00:00:00 2001 From: Manmohan Sharma Date: Thu, 16 Apr 2026 15:08:46 -0700 Subject: [PATCH] fix: stream directly from chat-api, bypass Next.js proxy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaced the double-proxy (browser→Next.js→chat-api→Modal) with direct streaming (browser→nginx→chat-api→Modal). Added nginx route for /api/conversations → chat-api. Inlined SSE parsing in ChatWindow instead of useSSE hook going through /api/chat/stream. Co-Authored-By: Claude Opus 4.6 (1M context) --- nginx/nginx.conf | 17 +++ .../frontend/components/chat/ChatWindow.tsx | 116 ++++++++++++------ 2 files changed, 97 insertions(+), 36 deletions(-) diff --git a/nginx/nginx.conf b/nginx/nginx.conf index 93b3b632..ab820831 100644 --- a/nginx/nginx.conf +++ b/nginx/nginx.conf @@ -32,6 +32,23 @@ http { proxy_set_header X-Forwarded-Proto $scheme; } + # Chat API — direct to backend for conversations + streaming + location /api/conversations { + limit_req zone=api burst=20 nodelay; + proxy_pass http://chat-api:8002/api/conversations; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # SSE streaming support + proxy_buffering off; + proxy_cache off; + proxy_read_timeout 300s; + proxy_set_header Connection ''; + chunked_transfer_encoding off; + } + # Grafana (optional) location /grafana/ { proxy_pass http://grafana:3000/; diff --git a/services/frontend/components/chat/ChatWindow.tsx b/services/frontend/components/chat/ChatWindow.tsx index 5dfe5105..f9bc48bc 100644 --- a/services/frontend/components/chat/ChatWindow.tsx +++ b/services/frontend/components/chat/ChatWindow.tsx @@ -6,7 +6,6 @@ import MessageBubble from './MessageBubble'; import EmptyState from './EmptyState'; import ChatInput from './ChatInput'; import { useChatStore } from '@/store/chatStore'; -import { useSSE } from '@/hooks/useSSE'; import { useAuth } from '@/hooks/useAuth'; import { authHeaders } from '@/lib/auth-client'; import { parseSlashCommand } from '@/lib/slashCommands'; @@ -52,30 +51,89 @@ export default function ChatWindow() { scrollToBottom(); }, [messages.length, streamingMsgId, scrollToBottom]); - const { start, stop, isStreaming } = useSSE('/api/chat/stream', { - onToken: (token) => { - streamingBufferRef.current += token; - if (streamingMsgId && currentConversationId) { - updateMessage(currentConversationId, streamingMsgId, streamingBufferRef.current); + const [isStreaming, setIsStreaming] = useState(false); + const abortRef = useRef(null); + + const stop = useCallback(() => { + abortRef.current?.abort(); + abortRef.current = null; + setIsStreaming(false); + }, []); + + const streamFromApi = useCallback(async (convId: string, content: string, temp?: number, topk?: number) => { + stop(); + const ac = new AbortController(); + abortRef.current = ac; + setIsStreaming(true); + + try { + const headers: Record = { + 'Content-Type': 'application/json', + ...authHeaders(), + }; + + // Call chat-api directly via nginx — no Next.js proxy + const res = await fetch(`/api/conversations/${convId}/messages`, { + method: 'POST', + headers, + body: JSON.stringify({ content, temperature: temp, max_tokens: 512, top_k: topk }), + signal: ac.signal, + }); + + if (!res.ok || !res.body) { + throw new Error(`HTTP ${res.status}`); } - }, - onDone: () => { + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + let nl: number; + while ((nl = buffer.indexOf('\n')) !== -1) { + const line = buffer.slice(0, nl).trim(); + buffer = buffer.slice(nl + 1); + if (!line.startsWith('data:')) continue; + const payload = line.slice(5).trim(); + if (!payload) continue; + try { + const data = JSON.parse(payload); + if (data.done) { + setStreamingMsgId(null); + streamingBufferRef.current = ''; + setIsStreaming(false); + return; + } + if (typeof data.token === 'string') { + streamingBufferRef.current += data.token; + if (streamingMsgId && convId) { + updateMessage(convId, streamingMsgId!, streamingBufferRef.current); + } + } + } catch { /* skip malformed */ } + } + } + setStreamingMsgId(null); streamingBufferRef.current = ''; - }, - onError: (err) => { - console.error('[chat] stream error:', err); - if (streamingMsgId && currentConversationId) { - updateMessage( - currentConversationId, - streamingMsgId, - `Error: ${err.message}. Using mock responses requires only the frontend; cloud streaming requires CHAT_API_URL.`, - ); + } catch (err) { + if ((err as Error).name !== 'AbortError') { + console.error('[chat] stream error:', err); + if (streamingMsgId && currentConversationId) { + updateMessage(currentConversationId, streamingMsgId!, `Error: ${(err as Error).message}`); + } } setStreamingMsgId(null); streamingBufferRef.current = ''; - }, - }); + } finally { + setIsStreaming(false); + abortRef.current = null; + } + }, [stop, streamingMsgId, currentConversationId, updateMessage]); const ensureConversation = useCallback(async () => { if (currentConversationId) return currentConversationId; @@ -116,21 +174,7 @@ export default function ChatWindow() { setStreamingMsgId(assistantId); streamingBufferRef.current = ''; - const history = [ - ...(useChatStore.getState().conversations.find((c) => c.id === convId)?.messages ?? []), - ] - .filter((m) => m.role === 'user' || m.role === 'assistant') - .slice(0, -1) - .map((m) => ({ role: m.role, content: m.content })); - - await start({ - messages: history, - model, - temperature, - topK, - conversationId: convId, - auth: authHeaders(), - }); + await streamFromApi(convId, text, temperature, topK); }, [ draft, @@ -139,12 +183,12 @@ export default function ChatWindow() { temperature, topK, appendMessage, - model, + streamFromApi, setTemperature, setTopK, createConversation, newConversation, - start, + // streamFromApi in deps via earlier line ], );