fix: use SSE streaming for image playground and add edits to distributor group check
Docker Build / Build and Push Docker Image (push) Successful in 3m59s
Docker Build / Build and Push Docker Image (push) Successful in 3m59s
This commit is contained in:
@@ -83,8 +83,8 @@ func Distribute() func(c *gin.Context) {
|
||||
}
|
||||
var selectGroup string
|
||||
usingGroup := common.GetContextKeyString(c, constant.ContextKeyUsingGroup)
|
||||
// check path is /pg/chat/completions or /pg/images/generations
|
||||
if strings.HasPrefix(c.Request.URL.Path, "/pg/chat/completions") || strings.HasPrefix(c.Request.URL.Path, "/pg/images/generations") {
|
||||
// check path is /pg/chat/completions or /pg/images/generations or /pg/images/edits
|
||||
if strings.HasPrefix(c.Request.URL.Path, "/pg/chat/completions") || strings.HasPrefix(c.Request.URL.Path, "/pg/images/generations") || strings.HasPrefix(c.Request.URL.Path, "/pg/images/edits") {
|
||||
playgroundRequest := &dto.PlayGroundRequest{}
|
||||
err = common.UnmarshalBodyReusable(c, playgroundRequest)
|
||||
if err != nil {
|
||||
|
||||
+116
-14
@@ -1,4 +1,4 @@
|
||||
import { api } from '@/lib/api'
|
||||
import { api, getCommonHeaders } from '@/lib/api'
|
||||
import { API_ENDPOINTS } from './constants'
|
||||
import type {
|
||||
ImageGenerationRequest,
|
||||
@@ -9,14 +9,120 @@ import type {
|
||||
ReferenceImage,
|
||||
} from './types'
|
||||
|
||||
export async function generateImages(
|
||||
request: ImageGenerationRequest
|
||||
/**
|
||||
* Parse an SSE stream from the image generation endpoint.
|
||||
* The backend may return SSE events (when stream=true or upstream returns SSE).
|
||||
* We collect all image_generation.completed events and the final [DONE].
|
||||
*/
|
||||
async function parseImageSSEResponse(response: Response): Promise<ImageGenerationResponse> {
|
||||
const reader = response.body?.getReader()
|
||||
if (!reader) {
|
||||
throw new Error('No response body')
|
||||
}
|
||||
|
||||
const decoder = new TextDecoder()
|
||||
let buffer = ''
|
||||
const images: Array<{ url?: string; b64_json?: string; revised_prompt?: string }> = []
|
||||
let created = 0
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) break
|
||||
|
||||
buffer += decoder.decode(value, { stream: true })
|
||||
const lines = buffer.split('\n')
|
||||
buffer = lines.pop() || ''
|
||||
|
||||
for (const line of lines) {
|
||||
const trimmed = line.trim()
|
||||
if (!trimmed.startsWith('data:')) continue
|
||||
|
||||
const data = trimmed.slice(5).trim()
|
||||
if (data === '[DONE]') continue
|
||||
if (!data) continue
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(data)
|
||||
if (parsed.url || parsed.b64_json) {
|
||||
images.push({
|
||||
url: parsed.url,
|
||||
b64_json: parsed.b64_json,
|
||||
revised_prompt: parsed.revised_prompt,
|
||||
})
|
||||
}
|
||||
if (parsed.created && !created) {
|
||||
created = parsed.created
|
||||
}
|
||||
} catch {
|
||||
// Skip unparseable lines
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { created, data: images }
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an image generation request using fetch with SSE streaming.
|
||||
* This avoids axios timeout issues and correctly handles SSE responses
|
||||
* from the backend (which may convert upstream JSON to SSE format).
|
||||
*/
|
||||
async function fetchImageSSE(
|
||||
url: string,
|
||||
body: Record<string, unknown> | FormData,
|
||||
): Promise<ImageGenerationResponse> {
|
||||
const res = await api.post(API_ENDPOINTS.IMAGE_GENERATIONS, request, {
|
||||
skipErrorHandler: true,
|
||||
timeout: 5 * 60 * 1000,
|
||||
} as Record<string, unknown>)
|
||||
return res.data
|
||||
const headers = getCommonHeaders()
|
||||
|
||||
let init: RequestInit
|
||||
if (body instanceof FormData) {
|
||||
// Let browser set Content-Type with boundary for multipart
|
||||
delete headers['Content-Type']
|
||||
init = { method: 'POST', headers, body, credentials: 'include' }
|
||||
} else {
|
||||
init = { method: 'POST', headers, body: JSON.stringify(body), credentials: 'include' }
|
||||
}
|
||||
|
||||
const response = await fetch(url, init)
|
||||
|
||||
if (!response.ok) {
|
||||
const contentType = response.headers.get('content-type') || ''
|
||||
if (contentType.includes('application/json')) {
|
||||
const errorData = await response.json()
|
||||
throw Object.assign(new Error(errorData?.error?.message || errorData?.message || `HTTP ${response.status}`), {
|
||||
response: { status: response.status, data: errorData },
|
||||
})
|
||||
}
|
||||
// For SSE or other content types on error, try to read as text
|
||||
const text = await response.text()
|
||||
let errorMessage = `HTTP ${response.status}`
|
||||
try {
|
||||
const parsed = JSON.parse(text)
|
||||
errorMessage = parsed?.error?.message || parsed?.message || errorMessage
|
||||
} catch {
|
||||
// Use default error message
|
||||
}
|
||||
throw Object.assign(new Error(errorMessage), {
|
||||
response: { status: response.status },
|
||||
})
|
||||
}
|
||||
|
||||
const contentType = response.headers.get('content-type') || ''
|
||||
if (contentType.includes('text/event-stream')) {
|
||||
return parseImageSSEResponse(response)
|
||||
}
|
||||
|
||||
// Non-SSE JSON response
|
||||
const data = await response.json()
|
||||
return data as ImageGenerationResponse
|
||||
}
|
||||
|
||||
export async function generateImages(
|
||||
request: ImageGenerationRequest,
|
||||
): Promise<ImageGenerationResponse> {
|
||||
return fetchImageSSE(
|
||||
API_ENDPOINTS.IMAGE_GENERATIONS,
|
||||
{ ...request, stream: true },
|
||||
)
|
||||
}
|
||||
|
||||
export async function editImages(
|
||||
@@ -34,6 +140,7 @@ export async function editImages(
|
||||
const formData = new FormData()
|
||||
formData.set('model', model)
|
||||
formData.set('prompt', prompt)
|
||||
formData.set('stream', 'true')
|
||||
if (group) formData.set('group', group)
|
||||
if (options?.n) formData.set('n', String(options.n))
|
||||
if (options?.size) formData.set('size', options.size)
|
||||
@@ -47,12 +154,7 @@ export async function editImages(
|
||||
}
|
||||
}
|
||||
|
||||
const res = await api.post(API_ENDPOINTS.IMAGE_EDITS, formData, {
|
||||
skipErrorHandler: true,
|
||||
timeout: 5 * 60 * 1000,
|
||||
headers: { 'Content-Type': 'multipart/form-data' },
|
||||
} as Record<string, unknown>)
|
||||
return res.data
|
||||
return fetchImageSSE(API_ENDPOINTS.IMAGE_EDITS, formData)
|
||||
}
|
||||
|
||||
export async function getUserGroups(): Promise<GroupOption[]> {
|
||||
|
||||
@@ -84,6 +84,26 @@ function getImageGenerationErrorMessage(error: unknown, t: (key: string) => stri
|
||||
}
|
||||
}
|
||||
|
||||
// Handle errors from fetch-based requests (non-axios)
|
||||
const err = error as Record<string, unknown> | undefined
|
||||
const response = err?.response as Record<string, unknown> | undefined
|
||||
|
||||
// Check for upstream error message in fetch-style errors
|
||||
const responseData = response?.data as Record<string, unknown> | undefined
|
||||
const nestedError = responseData?.error as Record<string, unknown> | undefined
|
||||
const upstreamMsg = nestedError?.message || responseData?.message
|
||||
if (typeof upstreamMsg === 'string' && upstreamMsg.trim()) {
|
||||
return upstreamMsg
|
||||
}
|
||||
|
||||
const status = response?.status
|
||||
if (status === 502 || status === 503) {
|
||||
return t('Image generation service is temporarily unavailable. Please try again later.')
|
||||
}
|
||||
if (status === 504) {
|
||||
return t('Image generation timed out. The upstream server took too long to respond.')
|
||||
}
|
||||
|
||||
const message = error instanceof Error ? error.message : ''
|
||||
if (message.includes('bad response status code 502') || message.includes('bad response status code 503')) {
|
||||
return t('Image generation service is temporarily unavailable. Please try again later.')
|
||||
|
||||
Reference in New Issue
Block a user