Skip to content
11 changes: 8 additions & 3 deletions service/src/chatgpt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,18 @@ search result: <search_result>${searchResultContent}</search_result>`,

const finish_reason = chunk.choices[0]?.finish_reason

// Build response object similar to the original implementation
// Build incremental response object
const responseChunk = {
id: chunk.id,
reasoning: responseReasoning,
text: responseText,
reasoning: responseReasoning, // Accumulated reasoning content
text: responseText, // Accumulated text content
role: 'assistant',
finish_reason,
// Incremental data
delta: {
reasoning: reasoningContent, // reasoning content in this chunk
text: content, // text content in this chunk
},
}

// Call the process callback if provided
Expand Down
5 changes: 5 additions & 0 deletions service/src/chatgpt/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ export interface ResponseChunk {
reasoning?: string
role?: string
finish_reason?: string
// 支持增量响应
delta?: {
reasoning?: string
text?: string
}
}

export interface RequestOptions {
Expand Down
74 changes: 64 additions & 10 deletions service/src/routes/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,12 @@ router.post('/chat-clear', auth, async (req, res) => {
})

router.post('/chat-process', [auth, limiter], async (req, res) => {
res.setHeader('Content-type', 'application/octet-stream')
// set headers for SSE
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Headers', 'Cache-Control')

let { roomId, uuid, regenerate, prompt, uploadFileKeys, options = {}, systemMessage, temperature, top_p } = req.body as RequestProps
const userId = req.headers.userId.toString()
Expand All @@ -241,6 +246,22 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
let result
let message: ChatInfo
let user = await getUserById(userId)

// SSE helper functions
const sendSSEData = (eventType: string, data: any) => {
res.write(`event: ${eventType}\n`)
res.write(`data: ${JSON.stringify(data)}\n\n`)
}

const sendSSEError = (error: string) => {
sendSSEData('error', JSON.stringify({ message: error }))
}

const sendSSEEnd = () => {
res.write('event: end\n')
res.write('data: [DONE]\n\n')
}

try {
// If use the fixed fakeuserid(some probability of duplicated with real ones), redefine user which is send to chatReplyProcess
if (userId === '6406d8c50aedd633885fa16f') {
Expand All @@ -251,29 +272,56 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
if (config.siteConfig?.usageCountLimit) {
const useAmount = user ? (user.useAmount ?? 0) : 0
if (Number(useAmount) <= 0 && user.limit_switch) {
res.send({ status: 'Fail', message: '提问次数用完啦 | Question limit reached', data: null })
sendSSEError('提问次数用完啦 | Question limit reached')
sendSSEEnd()
res.end()
return
}
}
}

if (config.auditConfig.enabled || config.auditConfig.customizeEnabled) {
if (!user.roles.includes(UserRole.Admin) && await containsSensitiveWords(config.auditConfig, prompt)) {
res.send({ status: 'Fail', message: '含有敏感词 | Contains sensitive words', data: null })
sendSSEError('含有敏感词 | Contains sensitive words')
sendSSEEnd()
res.end()
return
}
}

message = regenerate ? await getChat(roomId, uuid) : await insertChat(uuid, prompt, uploadFileKeys, roomId, model, options as ChatOptions)
let firstChunk = true

result = await chatReplyProcess({
message: prompt,
uploadFileKeys,
parentMessageId: options?.parentMessageId,
process: (chunk: ResponseChunk) => {
lastResponse = chunk

res.write(firstChunk ? JSON.stringify(chunk) : `\n${JSON.stringify(chunk)}`)
firstChunk = false
// set sse event by different data type
if (chunk.searchQuery) {
sendSSEData('search_query', { searchQuery: chunk.searchQuery })
}
if (chunk.searchResults) {
sendSSEData('search_results', {
searchResults: chunk.searchResults,
searchUsageTime: chunk.searchUsageTime,
})
}
if (chunk.delta) {
// send SSE event with delta type
sendSSEData('delta', { m: chunk.delta })
}
else {
// send all data
sendSSEData('message', {
id: chunk.id,
reasoning: chunk.reasoning,
text: chunk.text,
role: chunk.role,
finish_reason: chunk.finish_reason,
})
}
},
systemMessage,
temperature,
Expand All @@ -283,11 +331,17 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
room,
chatUuid: uuid,
})
// return the whole response including usage
res.write(`\n${JSON.stringify(result.data)}`)

// 发送最终完成数据
if (result && result.status === 'Success') {
sendSSEData('complete', result.data)
}

sendSSEEnd()
}
catch (error) {
res.write(JSON.stringify({ message: error?.message }))
sendSSEError(error?.message || 'Unknown error')
sendSSEEnd()
}
finally {
res.end()
Expand All @@ -299,7 +353,7 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
}

if (result.data === undefined)
// eslint-disable-next-line no-unsafe-finally
// eslint-disable-next-line no-unsafe-finally
return

if (regenerate && message.options.messageId) {
Expand Down
92 changes: 80 additions & 12 deletions src/api/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { AxiosProgressEvent, GenericAbortSignal } from 'axios'
import type { AnnounceConfig, AuditConfig, ConfigState, GiftCard, KeyConfig, MailConfig, SearchConfig, SiteConfig, Status, UserInfo, UserPassword, UserPrompt } from '@/components/common/Setting/model'
import type { SettingsState } from '@/store/modules/user/helper'
import { useUserStore } from '@/store'
import { get, post } from '@/utils/request'
import fetchService from '@/utils/request/fetchService'

export function fetchAnnouncement<T = any>() {
return post<T>({
Expand All @@ -16,18 +16,30 @@ export function fetchChatConfig<T = any>() {
})
}

export function fetchChatAPIProcess<T = any>(
// SSE event handler interface
interface SSEEventHandlers {
onMessage?: (data: any) => void
onDelta?: (delta: { reasoning?: string, text?: string }) => void
onSearchQuery?: (data: { searchQuery: string }) => void
onSearchResults?: (data: { searchResults: any[], searchUsageTime: number }) => void
onComplete?: (data: any) => void
onError?: (error: string) => void
onEnd?: () => void
}

// SSE chat processing function using custom fetch service
export function fetchChatAPIProcessSSE(
params: {
roomId: number
uuid: number
regenerate?: boolean
prompt: string
uploadFileKeys?: string[]
options?: { conversationId?: string, parentMessageId?: string }
signal?: GenericAbortSignal
onDownloadProgress?: (progressEvent: AxiosProgressEvent) => void
signal?: AbortSignal
},
) {
handlers: SSEEventHandlers,
): Promise<void> {
const userStore = useUserStore()

const data: Record<string, any> = {
Expand All @@ -42,11 +54,67 @@ export function fetchChatAPIProcess<T = any>(
top_p: userStore.userInfo.advanced.top_p,
}

return post<T>({
url: '/chat-process',
data,
signal: params.signal,
onDownloadProgress: params.onDownloadProgress,
return new Promise((resolve, reject) => {
fetchService.postStream(
{
url: '/chat-process',
body: data,
signal: params.signal,
},
{
onChunk: (line: string) => {
if (line.trim() === '')
return

if (line.startsWith('event: ')) {
// const _eventType = line.substring(7).trim()
return
}

if (line.startsWith('data: ')) {
const data = line.substring(6).trim()

if (data === '[DONE]') {
handlers.onEnd?.()
resolve()
return
}

try {
const jsonData = JSON.parse(data)

// Dispatch to different handlers based on data type
if (jsonData.message) {
handlers.onError?.(jsonData.message)
}
else if (jsonData.searchQuery) {
handlers.onSearchQuery?.(jsonData)
}
else if (jsonData.searchResults) {
handlers.onSearchResults?.(jsonData)
}
else if (jsonData.m) {
handlers.onDelta?.(jsonData.m)
}
else {
handlers.onMessage?.(jsonData)
}
}
catch (e) {
console.error('Failed to parse SSE data:', data, e)
}
}
},
onError: (error: Error) => {
handlers.onError?.(error.message)
reject(error)
},
onComplete: () => {
handlers.onEnd?.()
resolve()
},
},
)
})
}

Expand Down Expand Up @@ -94,7 +162,7 @@ export function fetchLogin<T = any>(username: string, password: string, token?:
export function fetchLogout<T = any>() {
return post<T>({
url: '/user-logout',
data: { },
data: {},
})
}

Expand Down Expand Up @@ -309,7 +377,7 @@ export function fetchGetChatHistory<T = any>(roomId: number, lastId?: number, al
export function fetchClearAllChat<T = any>() {
return post<T>({
url: '/chat-clear-all',
data: { },
data: {},
})
}

Expand Down
Loading