fix: stream directly from chat-api, bypass Next.js proxy

Replaced the double-proxy (browser→Next.js→chat-api→Modal) with
direct streaming (browser→nginx→chat-api→Modal). Added nginx route
for /api/conversations → chat-api. Inlined SSE parsing in ChatWindow
instead of useSSE hook going through /api/chat/stream.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Manmohan Sharma 2026-04-16 15:08:46 -07:00
parent df0584b861
commit a873b6ad46
No known key found for this signature in database
2 changed files with 97 additions and 36 deletions

View File

@ -32,6 +32,23 @@ http {
proxy_set_header X-Forwarded-Proto $scheme;
}
# Chat API direct to backend for conversations + streaming
location /api/conversations {
limit_req zone=api burst=20 nodelay;
proxy_pass http://chat-api:8002/api/conversations;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# SSE streaming support
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 300s;
proxy_set_header Connection '';
chunked_transfer_encoding off;
}
# Grafana (optional)
location /grafana/ {
proxy_pass http://grafana:3000/;

View File

@ -6,7 +6,6 @@ import MessageBubble from './MessageBubble';
import EmptyState from './EmptyState';
import ChatInput from './ChatInput';
import { useChatStore } from '@/store/chatStore';
import { useSSE } from '@/hooks/useSSE';
import { useAuth } from '@/hooks/useAuth';
import { authHeaders } from '@/lib/auth-client';
import { parseSlashCommand } from '@/lib/slashCommands';
@ -52,30 +51,89 @@ export default function ChatWindow() {
scrollToBottom();
}, [messages.length, streamingMsgId, scrollToBottom]);
const { start, stop, isStreaming } = useSSE('/api/chat/stream', {
onToken: (token) => {
streamingBufferRef.current += token;
if (streamingMsgId && currentConversationId) {
updateMessage(currentConversationId, streamingMsgId, streamingBufferRef.current);
const [isStreaming, setIsStreaming] = useState(false);
const abortRef = useRef<AbortController | null>(null);
const stop = useCallback(() => {
abortRef.current?.abort();
abortRef.current = null;
setIsStreaming(false);
}, []);
const streamFromApi = useCallback(async (convId: string, content: string, temp?: number, topk?: number) => {
stop();
const ac = new AbortController();
abortRef.current = ac;
setIsStreaming(true);
try {
const headers: Record<string, string> = {
'Content-Type': 'application/json',
...authHeaders(),
};
// Call chat-api directly via nginx — no Next.js proxy
const res = await fetch(`/api/conversations/${convId}/messages`, {
method: 'POST',
headers,
body: JSON.stringify({ content, temperature: temp, max_tokens: 512, top_k: topk }),
signal: ac.signal,
});
if (!res.ok || !res.body) {
throw new Error(`HTTP ${res.status}`);
}
},
onDone: () => {
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let nl: number;
while ((nl = buffer.indexOf('\n')) !== -1) {
const line = buffer.slice(0, nl).trim();
buffer = buffer.slice(nl + 1);
if (!line.startsWith('data:')) continue;
const payload = line.slice(5).trim();
if (!payload) continue;
try {
const data = JSON.parse(payload);
if (data.done) {
setStreamingMsgId(null);
streamingBufferRef.current = '';
setIsStreaming(false);
return;
}
if (typeof data.token === 'string') {
streamingBufferRef.current += data.token;
if (streamingMsgId && convId) {
updateMessage(convId, streamingMsgId!, streamingBufferRef.current);
}
}
} catch { /* skip malformed */ }
}
}
setStreamingMsgId(null);
streamingBufferRef.current = '';
},
onError: (err) => {
console.error('[chat] stream error:', err);
if (streamingMsgId && currentConversationId) {
updateMessage(
currentConversationId,
streamingMsgId,
`Error: ${err.message}. Using mock responses requires only the frontend; cloud streaming requires CHAT_API_URL.`,
);
} catch (err) {
if ((err as Error).name !== 'AbortError') {
console.error('[chat] stream error:', err);
if (streamingMsgId && currentConversationId) {
updateMessage(currentConversationId, streamingMsgId!, `Error: ${(err as Error).message}`);
}
}
setStreamingMsgId(null);
streamingBufferRef.current = '';
},
});
} finally {
setIsStreaming(false);
abortRef.current = null;
}
}, [stop, streamingMsgId, currentConversationId, updateMessage]);
const ensureConversation = useCallback(async () => {
if (currentConversationId) return currentConversationId;
@ -116,21 +174,7 @@ export default function ChatWindow() {
setStreamingMsgId(assistantId);
streamingBufferRef.current = '';
const history = [
...(useChatStore.getState().conversations.find((c) => c.id === convId)?.messages ?? []),
]
.filter((m) => m.role === 'user' || m.role === 'assistant')
.slice(0, -1)
.map((m) => ({ role: m.role, content: m.content }));
await start({
messages: history,
model,
temperature,
topK,
conversationId: convId,
auth: authHeaders(),
});
await streamFromApi(convId, text, temperature, topK);
},
[
draft,
@ -139,12 +183,12 @@ export default function ChatWindow() {
temperature,
topK,
appendMessage,
model,
streamFromApi,
setTemperature,
setTopK,
createConversation,
newConversation,
start,
// streamFromApi in deps via earlier line
],
);