新增redis+bug修复

This commit is contained in:
537yaha
2026-04-01 13:40:42 +08:00
parent b5407a28c7
commit e84d8be21f
31 changed files with 894 additions and 205 deletions
+13
View File
@@ -45,6 +45,19 @@ ADMIN_PASSWORD=please_change_admin_password
# 生成示例:openssl rand -hex 32
ENCRYPTION_KEY=please_generate_64_hex_chars
# =========================
# Redis(按需配置,多实例推荐)
# 说明:
# - 单实例可留空
# - 多实例可配置 REDIS_URL,或使用 REDIS_ADDR + REDIS_PASSWORD + REDIS_DB
# =========================
REDIS_URL=
REDIS_ADDR=
REDIS_PASSWORD=
REDIS_DB=0
# WebSocket 分布式事件频道(一般无需修改)
REDIS_WS_CHANNEL=ai_cs:ws_events
# =========================
# 端口映射(非必须)
# =========================
+29 -1
View File
@@ -3,6 +3,22 @@
> 开源的 AI 客服系统:**AI + 人工一体**、可私有化部署、可配置、可观测。
> 适合把“官网右下角客服小窗”与“客服工作台”一起落地的团队。
## 界面预览
> 以下为当前版本关键界面截图(本地预览路径)。
**官网首页(核心能力模块)**
![官网首页(核心能力模块)](file:///C:/Users/29301/.cursor/projects/d-tools-AI-CS/assets/c__Users_29301_AppData_Roaming_Cursor_User_workspaceStorage_9cb4ac4ea85a91ea0567ed8202874002_images_image-c4852687-51e5-47ec-b85c-35413a8b19b6.png)
**客服小窗(人工客服模式)**
![客服小窗(人工客服模式)](file:///C:/Users/29301/.cursor/projects/d-tools-AI-CS/assets/c__Users_29301_AppData_Roaming_Cursor_User_workspaceStorage_9cb4ac4ea85a91ea0567ed8202874002_images_image-81a593c2-6820-40da-907d-118032ed5e54.png)
**客服小窗(AI 客服模式)**
![客服小窗(AI 客服模式)](file:///C:/Users/29301/.cursor/projects/d-tools-AI-CS/assets/c__Users_29301_AppData_Roaming_Cursor_User_workspaceStorage_9cb4ac4ea85a91ea0567ed8202874002_images_image-65d7a08f-e812-4d40-95ed-24f10df81516.png)
## 在线演示
- **官网首页(产品介绍 + SEO)**:[demo.cscorp.top](https://demo.cscorp.top)
@@ -79,6 +95,7 @@ docker-compose -f docker-compose.prod.yml up -d
- 出于演示环境安全,前端默认**不允许**:
- 修改 `admin` 账号密码
- 删除任意 `admin` 账号
- 删除 `agent` 用户时,系统会自动把其名下 AI 配置转移给当前管理员,避免配置丢失或无人维护。
- 若需维护管理员账号,请直接通过数据库操作(例如重置密码、删除异常管理员)。
#### 端口修改(重要说明)
@@ -140,6 +157,11 @@ npm run dev
| `ADMIN_USERNAME` | 默认管理员用户名 | 否 | `admin` | `admin` |
| `ADMIN_PASSWORD` | 默认管理员密码 | 是 | 无 | `AdminPwd` |
| `ENCRYPTION_KEY` | 后端加密密钥(64位 hex) | 是 | 无 | `openssl rand -hex 32` |
| `REDIS_URL` | Redis 连接串(启用跨实例 WS 广播) | 可选(多实例推荐) | 空 | `redis://:pwd@redis:6379/0` |
| `REDIS_ADDR` | Redis 地址(与 `REDIS_URL` 二选一) | 可选 | 空 | `redis:6379` |
| `REDIS_PASSWORD` | Redis 密码(使用 `REDIS_ADDR` 时) | 可选 | 空 | `StrongRedisPwd` |
| `REDIS_DB` | Redis DB(使用 `REDIS_ADDR` 时) | 可选 | `0` | `0` |
| `REDIS_WS_CHANNEL` | 分布式 WS 事件频道名 | 可选 | `ai_cs:ws_events` | `ai_cs:ws_events` |
| `BACKEND_PORT` | 后端映射到宿主机端口 | 否 | `18080` | `28080` |
| `FRONTEND_PORT` | 前端映射到宿主机端口 | 否 | `3000` | `13000` |
| `MILVUS_HOST` | 向量库地址 | 可选(启用 RAG) | `milvus-standalone` | `localhost` |
@@ -166,6 +188,12 @@ npm run dev
- **你必须依赖知识库**(生产强约束):把 `.env``MILVUS_REQUIRED=true`
- 此时如果 Milvus 不可用,会落库一条错误日志后退出,避免“半残服务上线”
## 多实例实时消息一致性(Redis)
- 单实例可不配置 Redis,系统维持当前行为。
- 多实例/多副本部署建议配置 `REDIS_URL`(或 `REDIS_ADDR` + `REDIS_PASSWORD` + `REDIS_DB`),用于 WebSocket 事件跨实例同步。
- 可通过 `REDIS_WS_CHANNEL` 自定义事件频道(默认 `ai_cs:ws_events`)。
## 集成访客小窗到你的网站(iframe)
把下面代码放到你网站的 `</body>` 前,核心是把 `src` 指向你自己的部署域名的 `/chat`
@@ -204,7 +232,7 @@ npm run dev
- **提示音听不到**:浏览器通常需要“用户一次交互”才能解锁音频;请先点一下页面任意按钮/再打开喇叭开关测试
- **向量库连不上导致启动失败**:检查 `.env``MILVUS_REQUIRED` 是否误开;不需要知识库时建议 `MILVUS_DISABLED=true`
- **搜不到站点/分享卡片不正确**:设置 `NEXT_PUBLIC_SITE_URL=https://你的域名`,用于 canonical / OG / sitemap 生成
## 贡献
欢迎提交 Issue 和 Pull Request。
+23 -19
View File
@@ -58,9 +58,9 @@ func (a *AdminController) checkAdminPermission(c *gin.Context) (uint, bool) {
}
type createAgentRequest struct {
Username string `json:"username"`
Password string `json:"password"`
Role string `json:"role"`
Username string `json:"username"`
Password string `json:"password"`
Role string `json:"role"`
Permissions []string `json:"permissions"`
}
@@ -155,12 +155,12 @@ func (a *AdminController) CreateUser(c *gin.Context) {
_ = currentUserID
var req struct {
Username string `json:"username"`
Password string `json:"password"`
Role string `json:"role"`
Username string `json:"username"`
Password string `json:"password"`
Role string `json:"role"`
Permissions []string `json:"permissions"`
Nickname *string `json:"nickname"`
Email *string `json:"email"`
Nickname *string `json:"nickname"`
Email *string `json:"email"`
}
if err := c.ShouldBindJSON(&req); err != nil {
@@ -169,12 +169,12 @@ func (a *AdminController) CreateUser(c *gin.Context) {
}
user, err := a.userService.CreateUser(service.CreateUserInput{
Username: req.Username,
Password: req.Password,
Role: req.Role,
Username: req.Username,
Password: req.Password,
Role: req.Role,
Permissions: req.Permissions,
Nickname: req.Nickname,
Email: req.Email,
Nickname: req.Nickname,
Email: req.Email,
})
if err != nil {
switch err {
@@ -211,11 +211,11 @@ func (a *AdminController) UpdateUser(c *gin.Context) {
}
var req struct {
Role *string `json:"role"`
Role *string `json:"role"`
Permissions *[]string `json:"permissions"`
Nickname *string `json:"nickname"`
Email *string `json:"email"`
ReceiveAIConversations *bool `json:"receive_ai_conversations"`
Nickname *string `json:"nickname"`
Email *string `json:"email"`
ReceiveAIConversations *bool `json:"receive_ai_conversations"`
}
if err := c.ShouldBindJSON(&req); err != nil {
@@ -263,7 +263,8 @@ func (a *AdminController) DeleteUser(c *gin.Context) {
return
}
if err := a.userService.DeleteUser(uint(id), currentUserID); err != nil {
transferred, err := a.userService.DeleteUser(uint(id), currentUserID)
if err != nil {
if err.Error() == "用户不存在" {
c.JSON(http.StatusNotFound, gin.H{"error": "用户不存在"})
} else {
@@ -273,7 +274,10 @@ func (a *AdminController) DeleteUser(c *gin.Context) {
return
}
c.JSON(http.StatusOK, gin.H{"message": "删除成功"})
c.JSON(http.StatusOK, gin.H{
"message": "删除成功",
"transferred_ai_configs": transferred,
})
}
// UpdateUserPassword 处理更新用户密码的请求。
+10
View File
@@ -2,8 +2,10 @@ package controller
import (
"net/http"
"time"
"github.com/2930134478/AI-CS/backend/service"
"github.com/2930134478/AI-CS/backend/utils"
"github.com/gin-gonic/gin"
)
@@ -41,11 +43,19 @@ func (a *AuthController) Login(c *gin.Context) {
return
}
wsToken, wsTokenExp, tokenErr := utils.GenerateWSToken(user.ID, 24*time.Hour)
if tokenErr != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "登录失败"})
return
}
c.JSON(http.StatusOK, gin.H{
"message": "登录成功",
"user_id": user.ID,
"username": user.Username,
"role": user.Role,
"ws_token": wsToken,
"ws_token_exp": wsTokenExp,
// permissions 用于前端侧边栏显示(后端强校验以 X-User-Id 为准)
"permissions": func() []string {
if user.Role == "admin" {
+108 -1
View File
@@ -18,14 +18,21 @@ import (
type MessageController struct {
messageService *service.MessageService
conversationService *service.ConversationService
userService *service.UserService
storageService infra.StorageService
}
// NewMessageController 创建 MessageController 实例。
func NewMessageController(messageService *service.MessageService, conversationService *service.ConversationService, storageService infra.StorageService) *MessageController {
func NewMessageController(
messageService *service.MessageService,
conversationService *service.ConversationService,
userService *service.UserService,
storageService infra.StorageService,
) *MessageController {
return &MessageController{
messageService: messageService,
conversationService: conversationService,
userService: userService,
storageService: storageService,
}
}
@@ -54,6 +61,48 @@ func (mc *MessageController) CreateMessage(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数错误"})
return
}
userID := getUserIDFromHeader(c)
// 若带了客服身份头,则必须按客服消息处理,禁止伪装成访客消息。
if userID > 0 && !req.SenderIsAgent {
c.JSON(http.StatusForbidden, gin.H{"error": "已登录客服不允许以访客身份发送消息"})
return
}
// 客服消息必须绑定当前登录用户(X-User-Id),并以服务端用户 ID 为准,避免伪造 sender_id。
if req.SenderIsAgent {
if userID == 0 {
c.JSON(http.StatusForbidden, gin.H{"error": "未授权访问,请提供 X-User-Id 请求头"})
return
}
req.SenderID = userID
if mc.userService != nil {
// 按会话类型进行权限校验:
// - visitor 会话:需要 chat 权限
// - internal 会话:需要 kb_test 权限,且仅会话创建者可发送
detail, err := mc.conversationService.GetConversationDetail(req.ConversationID, userID)
if err != nil {
c.JSON(http.StatusForbidden, gin.H{"error": "无权限访问该会话"})
return
}
if detail.ConversationType == "internal" {
if detail.AgentID != userID {
c.JSON(http.StatusForbidden, gin.H{"error": "仅内部会话创建者可发送消息"})
return
}
if err := mc.userService.CheckPermission(userID, string(service.PermKBTest)); err != nil {
c.JSON(http.StatusForbidden, gin.H{"error": err.Error()})
return
}
} else {
if err := mc.userService.CheckPermission(userID, string(service.PermChat)); err != nil {
c.JSON(http.StatusForbidden, gin.H{"error": err.Error()})
return
}
}
}
} else {
// 访客消息的 sender_id 统一由服务端置 0,避免前端注入。
req.SenderID = 0
}
// 验证:必须有内容或文件
if req.Content == "" && req.FileURL == nil {
@@ -109,6 +158,31 @@ func (mc *MessageController) ListMessages(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"error": "会话ID不合法"})
return
}
if mc.userService != nil {
userID := getUserIDFromHeader(c)
detail, detailErr := mc.conversationService.GetConversationDetail(uint(conversationID), userID)
if detailErr != nil && userID > 0 {
c.JSON(http.StatusForbidden, gin.H{"error": "无权限访问该会话"})
return
}
if detail != nil {
if detail.ConversationType == "internal" {
if userID == 0 || detail.AgentID != userID {
c.JSON(http.StatusForbidden, gin.H{"error": "无权限访问内部会话"})
return
}
if err := mc.userService.CheckPermission(userID, string(service.PermKBTest)); err != nil {
c.JSON(http.StatusForbidden, gin.H{"error": err.Error()})
return
}
} else if userID > 0 {
if err := mc.userService.CheckPermission(userID, string(service.PermChat)); err != nil {
c.JSON(http.StatusForbidden, gin.H{"error": err.Error()})
return
}
}
}
}
// 解析 include_ai_messages 参数(默认 false
includeAIMessages := c.DefaultQuery("include_ai_messages", "false") == "true"
@@ -134,6 +208,39 @@ func (mc *MessageController) MarkMessagesRead(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数错误"})
return
}
if mc.userService != nil {
userID := getUserIDFromHeader(c)
detail, detailErr := mc.conversationService.GetConversationDetail(req.ConversationID, userID)
if detailErr != nil && userID > 0 {
c.JSON(http.StatusForbidden, gin.H{"error": "无权限访问该会话"})
return
}
if detail != nil {
if detail.ConversationType == "internal" {
if userID == 0 || detail.AgentID != userID {
c.JSON(http.StatusForbidden, gin.H{"error": "无权限访问内部会话"})
return
}
}
if req.ReaderIsAgent {
if userID == 0 {
c.JSON(http.StatusForbidden, gin.H{"error": "未授权访问,请提供 X-User-Id 请求头"})
return
}
if detail.ConversationType == "internal" {
if err := mc.userService.CheckPermission(userID, string(service.PermKBTest)); err != nil {
c.JSON(http.StatusForbidden, gin.H{"error": err.Error()})
return
}
} else {
if err := mc.userService.CheckPermission(userID, string(service.PermChat)); err != nil {
c.JSON(http.StatusForbidden, gin.H{"error": err.Error()})
return
}
}
}
}
}
result, err := mc.messageService.MarkMessagesRead(req.ConversationID, req.ReaderIsAgent)
if err != nil {
+4
View File
@@ -21,10 +21,12 @@ require (
github.com/bytedance/gopkg v0.1.3 // indirect
github.com/bytedance/sonic v1.14.1 // indirect
github.com/bytedance/sonic/loader v0.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/gabriel-vasile/mimetype v1.4.10 // indirect
github.com/getsentry/sentry-go v0.12.0 // indirect
github.com/gin-contrib/sse v1.1.0 // indirect
@@ -50,6 +52,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/redis/go-redis/v9 v9.18.0 // indirect
github.com/rogpeppe/go-internal v1.8.1 // indirect
github.com/segmentio/asm v1.1.3 // indirect
github.com/segmentio/encoding v0.5.3 // indirect
@@ -59,6 +62,7 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.3.0 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/arch v0.21.0 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/oauth2 v0.34.0 // indirect
+8
View File
@@ -24,6 +24,8 @@ github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZw
github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M=
github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU=
@@ -51,6 +53,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -250,6 +254,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg=
@@ -323,6 +329,8 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/arch v0.21.0 h1:iTC9o7+wP6cPWpDWkivCvQFGAHDQ59SrSxsLPcnkArw=
+18 -5
View File
@@ -344,7 +344,7 @@ func main() {
profileService := service.NewProfileService(userRepo, storageService)
aiConfigService := service.NewAIConfigService(aiConfigRepo, userRepo)
aiService := service.NewAIService(aiConfigRepo, messageRepo, conversationRepo, retrievalService, webSearchProvider, embeddingConfigService, promptConfigService, storageService, systemLogService)
userService := service.NewUserService(userRepo) // 用户管理服务
userService := service.NewUserService(userRepo, aiConfigRepo) // 用户管理服务
faqService := service.NewFAQService(faqRepo, retrievalService, documentEmbeddingService) // FAQ 管理服务
documentService := service.NewDocumentService(docRepo, kbRepo, documentEmbeddingService, retrievalService) // 文档管理服务
knowledgeBaseService := service.NewKnowledgeBaseService(kbRepo, docRepo) // 知识库管理服务
@@ -452,16 +452,29 @@ func main() {
}
// 创建 Hub(回调函数通过闭包访问 wsHub)
wsHub = websocket.NewHub(onConnect, onDisconnect)
// 可选启用 Redis Pub/Sub:配置 REDIS_URL 或 REDIS_ADDR 后自动开启跨实例广播。
wsBus, wsBusErr := websocket.NewRedisBusFromEnv()
if wsBusErr != nil {
log.Printf("⚠️ Redis Pub/Sub 初始化失败,将回退为单实例广播: %v", wsBusErr)
}
if wsBus != nil {
defer func() {
if err := wsBus.Close(); err != nil {
log.Printf("关闭 Redis Pub/Sub 失败: %v", err)
}
}()
log.Println("✅ 已启用 Redis Pub/Sub 跨实例广播")
}
wsHub = websocket.NewHub(onConnect, onDisconnect, wsBus)
go wsHub.Run() // 启动 Hub(在后台运行)
messageService := service.NewMessageService(conversationRepo, messageRepo, wsHub, aiService)
messageService := service.NewMessageService(db, conversationRepo, messageRepo, wsHub, aiService)
visitorService := service.NewVisitorService(userRepo, wsHub)
// 初始化控制器
authController := controller.NewAuthController(authService)
conversationController := controller.NewConversationController(conversationService, aiConfigService, userService)
messageController := controller.NewMessageController(messageService, conversationService, storageService)
messageController := controller.NewMessageController(messageService, conversationService, userService, storageService)
adminController := controller.NewAdminController(authService, userService)
profileController := controller.NewProfileController(profileService)
aiConfigController := controller.NewAIConfigController(aiConfigService, userService)
@@ -499,7 +512,7 @@ func main() {
Analytics: analyticsController,
SystemLog: systemLogController,
},
websocket.HandleWebSocket(wsHub),
websocket.HandleWebSocket(wsHub, userRepo),
)
// 配置静态文件服务(用于访问上传的头像等文件)
+20 -1
View File
@@ -51,6 +51,26 @@ func (r *AIConfigRepository) ListByUserID(userID uint) ([]models.AIConfig, error
return configs, nil
}
// CountByUserID 统计指定用户拥有的 AI 配置数量。
func (r *AIConfigRepository) CountByUserID(userID uint) (int64, error) {
var count int64
if err := r.db.Model(&models.AIConfig{}).Where("user_id = ?", userID).Count(&count).Error; err != nil {
return 0, err
}
return count, nil
}
// ReassignUser 将某用户名下的 AI 配置归属转移到另一位用户。
func (r *AIConfigRepository) ReassignUser(fromUserID, toUserID uint) (int64, error) {
res := r.db.Model(&models.AIConfig{}).
Where("user_id = ?", fromUserID).
Update("user_id", toUserID)
if res.Error != nil {
return 0, res.Error
}
return res.RowsAffected, nil
}
// UpdateFields 更新 AI 配置的指定字段。
func (r *AIConfigRepository) UpdateFields(id uint, values map[string]interface{}) error {
if len(values) == 0 {
@@ -76,4 +96,3 @@ func (r *AIConfigRepository) ListPublic(modelType string) ([]models.AIConfig, er
}
return configs, nil
}
+72 -49
View File
@@ -19,6 +19,7 @@ var (
// MessageService 负责消息领域的业务处理。
type MessageService struct {
db *gorm.DB
conversations *repository.ConversationRepository
messages *repository.MessageRepository
hub BroadcastHub
@@ -27,12 +28,14 @@ type MessageService struct {
// NewMessageService 创建 MessageService 实例。
func NewMessageService(
db *gorm.DB,
conversations *repository.ConversationRepository,
messages *repository.MessageRepository,
hub BroadcastHub,
aiService *AIService,
) *MessageService {
return &MessageService{
db: db,
conversations: conversations,
messages: messages,
hub: hub,
@@ -42,59 +45,79 @@ func NewMessageService(
// CreateMessage 创建消息并通过 WebSocket 广播。
func (s *MessageService) CreateMessage(input CreateMessageInput) (*models.Message, error) {
conv, err := s.conversations.GetByID(input.ConversationID)
if s.db == nil {
return nil, errors.New("db is not initialized")
}
var (
conv models.Conversation
message *models.Message
)
err := s.db.Transaction(func(tx *gorm.DB) error {
if err := tx.Where("id = ?", input.ConversationID).First(&conv).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return ErrConversationNotFound
}
return err
}
// B 方案:会话关闭后,如访客再次发消息则自动 reopen
if conv.Status == "closed" {
if input.SenderIsAgent {
return ErrConversationClosed
}
if err := tx.Model(&models.Conversation{}).Where("id = ?", conv.ID).Updates(map[string]interface{}{
"status": "open",
}).Error; err != nil {
return err
}
conv.Status = "open"
}
if input.SenderIsAgent && input.SenderID == 0 {
return errors.New("sender_id is required for agent messages")
}
message = &models.Message{
ConversationID: input.ConversationID,
SenderID: input.SenderID,
SenderIsAgent: input.SenderIsAgent,
Content: input.Content,
MessageType: "user_message",
ChatMode: conv.ChatMode,
IsRead: false,
FileURL: input.FileURL,
FileType: input.FileType,
FileName: input.FileName,
FileSize: input.FileSize,
MimeType: input.MimeType,
}
if err := tx.Create(message).Error; err != nil {
return err
}
// 如果客服发送消息,且会话的 agent_id 为 0,则更新为当前客服的 ID
updateFields := map[string]interface{}{
"updated_at": message.CreatedAt,
}
// 访客发送消息可视为在线心跳:同步刷新 last_seen_at,支撑客服端在线状态判定。
if !input.SenderIsAgent {
updateFields["last_seen_at"] = message.CreatedAt
}
if input.SenderIsAgent && input.SenderID > 0 && conv.AgentID == 0 {
updateFields["agent_id"] = input.SenderID
}
if err := tx.Model(&models.Conversation{}).Where("id = ?", conv.ID).Updates(updateFields).Error; err != nil {
return err
}
if agentID, ok := updateFields["agent_id"].(uint); ok {
conv.AgentID = agentID
}
return nil
})
if err != nil {
return nil, err
}
// B 方案:会话关闭后,如访客再次发消息则自动 reopen
if conv.Status == "closed" {
if input.SenderIsAgent {
return nil, ErrConversationClosed
}
if err := s.conversations.UpdateFields(conv.ID, map[string]interface{}{
"status": "open",
}); err != nil {
return nil, err
}
conv.Status = "open"
}
if input.SenderIsAgent && input.SenderID == 0 {
return nil, errors.New("sender_id is required for agent messages")
}
message := &models.Message{
ConversationID: input.ConversationID,
SenderID: input.SenderID,
SenderIsAgent: input.SenderIsAgent,
Content: input.Content,
MessageType: "user_message",
ChatMode: conv.ChatMode,
IsRead: false,
FileURL: input.FileURL,
FileType: input.FileType,
FileName: input.FileName,
FileSize: input.FileSize,
MimeType: input.MimeType,
}
if err := s.messages.Create(message); err != nil {
return nil, err
}
// 如果客服发送消息,且会话的 agent_id 为 0,则更新为当前客服的 ID
updateFields := map[string]interface{}{
"updated_at": message.CreatedAt,
}
if input.SenderIsAgent && input.SenderID > 0 && conv.AgentID == 0 {
updateFields["agent_id"] = input.SenderID
}
if err := s.conversations.UpdateFields(conv.ID, updateFields); err != nil {
return nil, err
}
if s.hub != nil {
// 1. 先广播到该对话房间内的客户端(访客 + 已按该 conversation_id 建连的客服)
s.hub.BroadcastMessage(message.ConversationID, "new_message", message)
+31 -11
View File
@@ -13,12 +13,16 @@ import (
// UserService 负责用户管理领域的业务编排。
type UserService struct {
users *repository.UserRepository
users *repository.UserRepository
aiConfigs *repository.AIConfigRepository
}
// NewUserService 创建 UserService 实例。
func NewUserService(users *repository.UserRepository) *UserService {
return &UserService{users: users}
func NewUserService(users *repository.UserRepository, aiConfigs *repository.AIConfigRepository) *UserService {
return &UserService{
users: users,
aiConfigs: aiConfigs,
}
}
// EffectivePermissions 计算用户“有效权限”。
@@ -256,32 +260,49 @@ func (s *UserService) UpdateUser(input UpdateUserInput) (*UserSummary, error) {
}
// DeleteUser 删除用户。
func (s *UserService) DeleteUser(id uint, currentUserID uint) error {
// 说明:为避免“孤儿配置”,删除前会将该用户名下 AI 配置自动转移给当前管理员。
func (s *UserService) DeleteUser(id uint, currentUserID uint) (int64, error) {
// 防止删除当前登录用户
if id == currentUserID {
return errors.New("不能删除当前登录用户")
return 0, errors.New("不能删除当前登录用户")
}
// 检查用户是否存在并获取用户信息
user, err := s.users.GetByID(id)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errors.New("用户不存在")
return 0, errors.New("用户不存在")
}
return err
return 0, err
}
// 演示站安全策略:管理员账号只能通过数据库维护,接口层禁止删除任何管理员。
if user.Role == "admin" {
return errors.New("管理员账号不允许通过前端删除,请使用数据库维护")
return 0, errors.New("管理员账号不允许通过前端删除,请使用数据库维护")
}
// 将被删除用户名下 AI 配置转移到当前管理员,避免配置成为“无人维护”的孤儿数据。
transferred := int64(0)
if s.aiConfigs != nil {
configCount, countErr := s.aiConfigs.CountByUserID(id)
if countErr != nil {
return 0, fmt.Errorf("统计用户关联 AI 配置失败: %w", countErr)
}
if configCount > 0 {
moved, moveErr := s.aiConfigs.ReassignUser(id, currentUserID)
if moveErr != nil {
return 0, fmt.Errorf("转移用户关联 AI 配置失败: %w", moveErr)
}
transferred = moved
}
}
// 执行删除
if err := s.users.Delete(id); err != nil {
return err
return 0, err
}
return nil
return transferred, nil
}
// UpdateUserPassword 更新用户密码。
@@ -329,4 +350,3 @@ func (s *UserService) UpdateUserPassword(input UpdatePasswordInput) error {
return nil
}
+77
View File
@@ -0,0 +1,77 @@
package utils
import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"fmt"
"os"
"strconv"
"strings"
"time"
)
func wsTokenSecret() []byte {
// 与现有系统保持一致:优先使用 ENCRYPTION_KEY;未设置时回退固定开发值。
secret := os.Getenv("ENCRYPTION_KEY")
if secret == "" {
secret = "abcdefghijklmnopqrstuvwxyz123456"
}
return []byte(secret)
}
// GenerateWSToken 生成客服 WebSocket 短期令牌。
func GenerateWSToken(userID uint, ttl time.Duration) (token string, expireAt int64, err error) {
if userID == 0 {
return "", 0, fmt.Errorf("invalid user id")
}
if ttl <= 0 {
ttl = 24 * time.Hour
}
expireAt = time.Now().Add(ttl).Unix()
payload := fmt.Sprintf("%d:%d", userID, expireAt)
payloadEnc := base64.RawURLEncoding.EncodeToString([]byte(payload))
mac := hmac.New(sha256.New, wsTokenSecret())
_, _ = mac.Write([]byte(payloadEnc))
signature := base64.RawURLEncoding.EncodeToString(mac.Sum(nil))
return payloadEnc + "." + signature, expireAt, nil
}
// ValidateWSToken 校验客服 WebSocket 令牌是否与用户匹配且未过期。
func ValidateWSToken(token string, expectedUserID uint) bool {
if expectedUserID == 0 || token == "" {
return false
}
parts := strings.Split(token, ".")
if len(parts) != 2 {
return false
}
payloadEnc, signature := parts[0], parts[1]
mac := hmac.New(sha256.New, wsTokenSecret())
_, _ = mac.Write([]byte(payloadEnc))
expectedSig := base64.RawURLEncoding.EncodeToString(mac.Sum(nil))
if !hmac.Equal([]byte(signature), []byte(expectedSig)) {
return false
}
payloadRaw, err := base64.RawURLEncoding.DecodeString(payloadEnc)
if err != nil {
return false
}
payloadParts := strings.Split(string(payloadRaw), ":")
if len(payloadParts) != 2 {
return false
}
uid64, err := strconv.ParseUint(payloadParts[0], 10, 64)
if err != nil || uint(uid64) != expectedUserID {
return false
}
expireAt, err := strconv.ParseInt(payloadParts[1], 10, 64)
if err != nil {
return false
}
return time.Now().Unix() <= expireAt
}
+27 -4
View File
@@ -5,6 +5,8 @@ import (
"net/http"
"strconv"
"github.com/2930134478/AI-CS/backend/repository"
"github.com/2930134478/AI-CS/backend/utils"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
@@ -19,7 +21,7 @@ var upgrader = websocket.Upgrader{
}
// HandleWebSocket 处理 WebSocket 连接
func HandleWebSocket(hub *Hub) gin.HandlerFunc {
func HandleWebSocket(hub *Hub, userRepo *repository.UserRepository) gin.HandlerFunc {
return func(c *gin.Context) {
// 从查询参数获取对话ID
conversationIDStr := c.Query("conversation_id")
@@ -42,9 +44,30 @@ func HandleWebSocket(hub *Hub) gin.HandlerFunc {
var agentID uint
if !isVisitor {
agentIDStr := c.Query("agent_id")
if agentIDStr != "" {
if parsed, err := strconv.ParseUint(agentIDStr, 10, 32); err == nil {
agentID = uint(parsed)
if agentIDStr == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "agent_id 不能为空"})
return
}
parsed, parseErr := strconv.ParseUint(agentIDStr, 10, 32)
if parseErr != nil || parsed == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "无效的 agent_id"})
return
}
agentID = uint(parsed)
wsToken := c.Query("ws_token")
if !utils.ValidateWSToken(wsToken, agentID) {
c.JSON(http.StatusUnauthorized, gin.H{"error": "ws_token 无效或已过期"})
return
}
if userRepo != nil {
user, userErr := userRepo.GetByID(agentID)
if userErr != nil || user == nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "客服身份无效"})
return
}
if user.Role != "admin" && user.Role != "agent" {
c.JSON(http.StatusForbidden, gin.H{"error": "仅客服账号允许建立该连接"})
return
}
}
}
+112 -77
View File
@@ -42,25 +42,44 @@ type Hub struct {
// 回调函数
onConnect OnClientConnectCallback
onDisconnect OnClientDisconnectCallback
// 分布式事件总线(可选,启用后支持多实例广播一致性)
bus DistributedBus
}
// Message 是要广播的消息
type Message struct {
ConversationID uint `json:"conversation_id"`
Data interface{} `json:"data"` // 消息内容(可以是 Message 对象)
Type string `json:"type"` // 消息类型:new_message, conversation_update 等
Data interface{} `json:"data"` // 消息内容(可以是 Message 对象)
Type string `json:"type"` // 消息类型:new_message, conversation_update 等
Scope string `json:"scope,omitempty"` // conversation | all_agents
FromRemote bool `json:"-"`
}
// NewHub 创建一个新的 Hub
func NewHub(onConnect OnClientConnectCallback, onDisconnect OnClientDisconnectCallback) *Hub {
return &Hub{
func NewHub(onConnect OnClientConnectCallback, onDisconnect OnClientDisconnectCallback, bus DistributedBus) *Hub {
h := &Hub{
conversations: make(map[uint]map[*Client]bool),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan *Message, 256),
onConnect: onConnect,
onDisconnect: onDisconnect,
bus: bus,
}
if bus != nil {
bus.Subscribe(func(msg *Message) {
if msg == nil {
return
}
msg.FromRemote = true
select {
case h.broadcast <- msg:
default:
log.Printf("⚠️ 分布式消息队列拥塞,丢弃事件: 对话ID=%d, 类型=%s", msg.ConversationID, msg.Type)
}
})
}
return h
}
// Run 启动 Hub,处理所有事件
@@ -139,32 +158,24 @@ func (h *Hub) Run() {
// 广播消息
case message := <-h.broadcast:
h.mu.RLock()
// 找到这个对话的所有客户端
clients, ok := h.conversations[message.ConversationID]
if !ok {
h.mu.RUnlock()
log.Printf("⚠️ 广播消息失败: 对话ID=%d 没有客户端连接", message.ConversationID)
if message == nil {
continue
}
// 创建一个客户端列表的副本(避免在遍历时修改)
clientList := make([]*Client, 0, len(clients))
for client := range clients {
clientList = append(clientList, client)
if message.Scope == "all_agents" {
clients := h.snapshotAllAgents()
h.sendToClients(clients, message)
} else {
clients := h.snapshotConversationClients(message.ConversationID)
if len(clients) == 0 {
log.Printf("⚠️ 广播消息失败: 对话ID=%d 没有客户端连接", message.ConversationID)
} else {
h.sendToClients(clients, message)
}
}
h.mu.RUnlock()
// 给所有客户端发送消息
for _, client := range clientList {
select {
case client.send <- message:
default:
// 如果发送失败(客户端可能已经断开),关闭连接
log.Printf("⚠️ 发送消息失败: 对话ID=%d, 客户端断开", client.conversationID)
close(client.send)
h.mu.Lock()
delete(h.conversations[client.conversationID], client)
h.mu.Unlock()
// 仅本地源事件向分布式总线发布,远端同步过来的事件不再二次发布(避免回环)。
if h.bus != nil && !message.FromRemote {
if err := h.bus.Publish(message); err != nil {
log.Printf("⚠️ 分布式广播失败: 对话ID=%d, 类型=%s, 错误=%v", message.ConversationID, message.Type, err)
}
}
}
@@ -177,62 +188,18 @@ func (h *Hub) BroadcastMessage(conversationID uint, messageType string, data int
ConversationID: conversationID,
Type: messageType,
Data: data,
Scope: "conversation",
}
}
// BroadcastToAllAgents 广播消息到所有客服客户端(不管连接到哪个对话)
// 用于 visitor_status_update 等需要所有客服都收到的事件
func (h *Hub) BroadcastToAllAgents(messageType string, data interface{}) {
h.mu.RLock()
// 收集所有客服客户端(isVisitor == false
allAgents := make([]*Client, 0)
for _, clients := range h.conversations {
for client := range clients {
if !client.isVisitor {
allAgents = append(allAgents, client)
}
}
}
h.mu.RUnlock()
// 为每个客服客户端创建消息并发送
for _, client := range allAgents {
// 如果 data 是 Message 对象,使用消息的 conversation_id
// 否则使用客户端连接的对话ID
var conversationID uint
if msg, ok := data.(*models.Message); ok {
conversationID = msg.ConversationID
} else if convID, ok := data.(map[string]interface{})["conversation_id"]; ok {
if id, ok := convID.(uint); ok {
conversationID = id
} else if id, ok := convID.(float64); ok {
conversationID = uint(id)
} else {
conversationID = client.conversationID
}
} else {
conversationID = client.conversationID
}
message := &Message{
ConversationID: conversationID,
Type: messageType,
Data: data,
}
select {
case client.send <- message:
default:
// 如果发送失败(客户端可能已经断开),关闭连接
log.Printf("⚠️ 发送消息到客服失败: 对话ID=%d, 客户端断开", client.conversationID)
close(client.send)
h.mu.Lock()
if clients, ok := h.conversations[client.conversationID]; ok {
delete(clients, client)
if len(clients) == 0 {
delete(h.conversations, client.conversationID)
}
}
h.mu.Unlock()
}
h.broadcast <- &Message{
ConversationID: conversationIDFromData(data, 0),
Type: messageType,
Data: data,
Scope: "all_agents",
}
}
@@ -252,3 +219,71 @@ func (h *Hub) GetOnlineAgentIDs() map[uint]bool {
}
return agentIDs
}
func (h *Hub) snapshotConversationClients(conversationID uint) []*Client {
h.mu.RLock()
defer h.mu.RUnlock()
clients := h.conversations[conversationID]
out := make([]*Client, 0, len(clients))
for c := range clients {
out = append(out, c)
}
return out
}
func (h *Hub) snapshotAllAgents() []*Client {
h.mu.RLock()
defer h.mu.RUnlock()
out := make([]*Client, 0)
for _, clients := range h.conversations {
for c := range clients {
if !c.isVisitor {
out = append(out, c)
}
}
}
return out
}
func (h *Hub) sendToClients(clients []*Client, message *Message) {
for _, client := range clients {
select {
case client.send <- message:
default:
log.Printf("⚠️ 发送消息失败: 对话ID=%d, 客户端断开", client.conversationID)
h.mu.Lock()
if cc, ok := h.conversations[client.conversationID]; ok {
delete(cc, client)
if len(cc) == 0 {
delete(h.conversations, client.conversationID)
}
}
h.mu.Unlock()
safeClose(client.send)
}
}
}
func safeClose(ch chan *Message) {
defer func() {
_ = recover()
}()
close(ch)
}
func conversationIDFromData(data interface{}, fallback uint) uint {
if msg, ok := data.(*models.Message); ok {
return msg.ConversationID
}
if m, ok := data.(map[string]interface{}); ok {
if convID, ok2 := m["conversation_id"]; ok2 {
switch v := convID.(type) {
case uint:
return v
case float64:
return uint(v)
}
}
}
return fallback
}
+149
View File
@@ -0,0 +1,149 @@
package websocket
import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
"time"
"github.com/redis/go-redis/v9"
)
type DistributedBus interface {
Publish(msg *Message) error
Subscribe(handler func(msg *Message))
Close() error
}
type redisWireMessage struct {
ConversationID uint `json:"conversation_id"`
Type string `json:"type"`
Scope string `json:"scope,omitempty"`
Data json.RawMessage `json:"data"`
Source string `json:"source"`
}
type RedisBus struct {
ctx context.Context
client *redis.Client
channel string
nodeID string
pubsub *redis.PubSub
}
func NewRedisBusFromEnv() (DistributedBus, error) {
redisURL := os.Getenv("REDIS_URL")
redisAddr := os.Getenv("REDIS_ADDR")
if redisURL == "" && redisAddr == "" {
return nil, nil
}
var opts *redis.Options
var err error
if redisURL != "" {
opts, err = redis.ParseURL(redisURL)
if err != nil {
return nil, fmt.Errorf("parse REDIS_URL failed: %w", err)
}
} else {
opts = &redis.Options{
Addr: redisAddr,
Password: os.Getenv("REDIS_PASSWORD"),
DB: 0,
}
if dbRaw := os.Getenv("REDIS_DB"); dbRaw != "" {
if db, parseErr := strconv.Atoi(dbRaw); parseErr == nil {
opts.DB = db
}
}
}
client := redis.NewClient(opts)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if pingErr := client.Ping(ctx).Err(); pingErr != nil {
_ = client.Close()
return nil, fmt.Errorf("redis ping failed: %w", pingErr)
}
channel := os.Getenv("REDIS_WS_CHANNEL")
if channel == "" {
channel = "ai_cs:ws_events"
}
nodeID := fmt.Sprintf("%s-%d", hostnameOrDefault(), time.Now().UnixNano())
return &RedisBus{
ctx: context.Background(),
client: client,
channel: channel,
nodeID: nodeID,
}, nil
}
func (r *RedisBus) Publish(msg *Message) error {
if msg == nil {
return nil
}
dataBytes, err := json.Marshal(msg.Data)
if err != nil {
return err
}
wire := redisWireMessage{
ConversationID: msg.ConversationID,
Type: msg.Type,
Scope: msg.Scope,
Data: dataBytes,
Source: r.nodeID,
}
payload, err := json.Marshal(wire)
if err != nil {
return err
}
return r.client.Publish(r.ctx, r.channel, payload).Err()
}
func (r *RedisBus) Subscribe(handler func(msg *Message)) {
if handler == nil {
return
}
r.pubsub = r.client.Subscribe(r.ctx, r.channel)
ch := r.pubsub.Channel()
go func() {
for item := range ch {
var wire redisWireMessage
if err := json.Unmarshal([]byte(item.Payload), &wire); err != nil {
continue
}
if wire.Source == r.nodeID {
continue
}
var data interface{}
if err := json.Unmarshal(wire.Data, &data); err != nil {
continue
}
handler(&Message{
ConversationID: wire.ConversationID,
Type: wire.Type,
Scope: wire.Scope,
Data: data,
FromRemote: true,
})
}
}()
}
func (r *RedisBus) Close() error {
if r.pubsub != nil {
_ = r.pubsub.Close()
}
return r.client.Close()
}
func hostnameOrDefault() string {
name, err := os.Hostname()
if err != nil || name == "" {
return "node"
}
return name
}
@@ -24,6 +24,7 @@ import {
} from "@/features/agent/types";
import type { WSMessage } from "@/lib/websocket";
import { toast } from "@/hooks/useToast";
import { getAgentWSToken } from "@/utils/storage";
export default function AgentChatPage() {
const params = useParams();
@@ -49,6 +50,7 @@ export default function AgentChatPage() {
const [loadingMessages, setLoadingMessages] = useState(true);
const [sending, setSending] = useState(false);
const [highlightKeyword, setHighlightKeyword] = useState("");
const wsToken = getAgentWSToken() ?? undefined;
const handleMarkMessagesRead = useCallback(
async (
@@ -336,6 +338,9 @@ export default function AgentChatPage() {
useWebSocket<ChatWebSocketPayload>({
conversationId,
enabled: Boolean(conversationId),
isVisitor: false,
agentId: agent?.id ?? undefined,
wsToken,
onMessage: handleWebSocketMessage,
onError: (error) => {
// 静默处理错误,避免影响用户体验
+4
View File
@@ -4,6 +4,7 @@ import { useRouter } from "next/navigation";
import { apiUrl } from "@/lib/config";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { setAgentWSToken } from "@/utils/storage";
export default function AgentLoginPage() {
const [username, setUsername] = useState("");
@@ -42,6 +43,9 @@ export default function AgentLoginPage() {
"agent_permissions",
JSON.stringify(Array.isArray(data.permissions) ? data.permissions : [])
);
if (typeof data.ws_token === "string" && typeof data.ws_token_exp === "number") {
setAgentWSToken(data.ws_token, data.ws_token_exp);
}
// 跳转到客服工作台(三栏布局)
router.push("/agent/dashboard");
+9 -3
View File
@@ -260,11 +260,17 @@ export default function UsersPage(props: any = {}) {
}
setSubmitting(true);
try {
await deleteUser(selectedUser.id, agent.id);
const result = await deleteUser(selectedUser.id, agent.id);
setDeleteDialogOpen(false);
setSelectedUser(null);
await loadUsers();
toast.success("删除成功");
if (result.transferredAIConfigs > 0) {
toast.success(
`删除成功,已自动转移 ${result.transferredAIConfigs} 条 AI 配置到当前管理员`
);
} else {
toast.success("删除成功");
}
} catch (error) {
toast.error((error as Error).message || "删除用户失败");
} finally {
@@ -736,7 +742,7 @@ export default function UsersPage(props: any = {}) {
<strong>{selectedUser.username}</strong>
</p>
<p className="text-sm text-muted-foreground">
AI
</p>
<div className="flex justify-end gap-2">
<Button
+10 -2
View File
@@ -8,8 +8,12 @@ const BACKEND_BASE = `http://${BACKEND_HOST}:${BACKEND_PORT}`;
export async function GET(request: NextRequest) {
const { searchParams } = new URL(request.url);
const backendUrl = `${BACKEND_BASE}/agent/prompts?${searchParams.toString()}`;
const userID = request.headers.get("X-User-Id") || "";
try {
const res = await fetch(backendUrl, { cache: "no-store" });
const res = await fetch(backendUrl, {
cache: "no-store",
headers: userID ? { "X-User-Id": userID } : {},
});
const body = await res.text();
return new NextResponse(body, {
status: res.status,
@@ -25,11 +29,15 @@ export async function GET(request: NextRequest) {
export async function PUT(request: NextRequest) {
const backendUrl = `${BACKEND_BASE}/agent/prompts`;
const userID = request.headers.get("X-User-Id") || "";
try {
const body = await request.text();
const res = await fetch(backendUrl, {
method: "PUT",
headers: { "Content-Type": "application/json" },
headers: {
"Content-Type": "application/json",
...(userID ? { "X-User-Id": userID } : {}),
},
body,
});
const resBody = await res.text();
@@ -68,7 +68,7 @@ export function OnlineAgentsList({
))}
</div>
<p className="text-sm font-medium text-muted-foreground text-center pt-1">
</p>
</div>
);
@@ -8,10 +8,16 @@ import {
} from "../../agent/services/conversationApi";
import type { ConversationListType } from "../../agent/services/conversationApi";
import type { ConversationStatus } from "../../agent/services/conversationApi";
import { ConversationSummary, VisitorStatusUpdatePayload } from "../../agent/types";
import {
ConversationSummary,
MessageItem,
VisitorStatusUpdatePayload,
} from "../../agent/types";
import { useWebSocket } from "./useWebSocket";
import { WSMessage } from "@/lib/websocket";
import { ChatWebSocketPayload } from "../../agent/types";
import { buildMessagePreview } from "@/utils/format";
import { getAgentWSToken } from "@/utils/storage";
const sortByUpdatedAtDesc = (list: ConversationSummary[]) =>
[...list].sort(
@@ -44,6 +50,8 @@ export function useConversations(options?: UseConversationsOptions) {
const [isInitialLoad, setIsInitialLoad] = useState(true);
const searchRef = useRef("");
const refreshTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const wsToken = getAgentWSToken() ?? undefined;
// 根据 filter 过滤会话
const applyFilter = useCallback(
@@ -105,6 +113,17 @@ export function useConversations(options?: UseConversationsOptions) {
loadConversations();
}, [loadConversations]);
// 兜底定时刷新:防止 WebSocket 漏事件/无会话时无法建立全局 WS 导致列表长期不更新。
useEffect(() => {
if (!agentId) {
return;
}
const interval = setInterval(() => {
void loadConversations();
}, 15000);
return () => clearInterval(interval);
}, [agentId, loadConversations]);
// 当 filter / listType 改变时,重新应用过滤(不重新加载数据)
useEffect(() => {
if (isInitialLoad) {
@@ -203,12 +222,21 @@ export function useConversations(options?: UseConversationsOptions) {
[conversations]
);
const scheduleRefreshConversations = useCallback(() => {
if (refreshTimerRef.current) {
clearTimeout(refreshTimerRef.current);
}
refreshTimerRef.current = setTimeout(() => {
void loadConversations();
}, 500);
}, [loadConversations]);
// 建立全局 WebSocket 连接以接收 visitor_status_update 等全局事件
// 使用第一个对话的 ID(如果存在),否则不建立连接
const globalConversationId = conversations.length > 0 ? conversations[0].id : null;
// 处理 visitor_status_update 事件
const handleVisitorStatusUpdate = useCallback(
// 处理全局 WebSocket 事件:访客在线状态 + 新消息摘要
const handleGlobalWebSocketMessage = useCallback(
(event: WSMessage<ChatWebSocketPayload>) => {
if (event.type === "visitor_status_update" && event.data) {
const payload = event.data as VisitorStatusUpdatePayload;
@@ -223,18 +251,68 @@ export function useConversations(options?: UseConversationsOptions) {
// 离线时,last_seen_at 会在后端更新,这里不需要特殊处理
// 因为对话列表会定期刷新,或者通过其他方式更新
}
} else if (event.type === "new_message" && event.data) {
const message = event.data as MessageItem;
if (typeof message?.conversation_id !== "number") {
return;
}
const isConversationExists = hasConversation(message.conversation_id);
if (!isConversationExists) {
// 新会话(当前列表里还没有)时,延迟刷新把它拉进来
scheduleRefreshConversations();
return;
}
const isSystemMessage =
(message.message_type ?? "user_message") === "system_message";
const isVisitorMessage = !message.sender_is_agent && !isSystemMessage;
const preview = buildMessagePreview(message.content ?? "");
updateConversation(message.conversation_id, (conv) => ({
...conv,
updated_at: message.created_at,
last_seen_at: isVisitorMessage
? message.created_at
: conv.last_seen_at ?? null,
unread_count: isVisitorMessage
? message.conversation_id === selectedConversationId
? 0
: (conv.unread_count ?? 0) + 1
: conv.unread_count ?? 0,
last_message: {
id: message.id,
content: preview,
sender_is_agent: message.sender_is_agent,
message_type: message.message_type ?? "user_message",
is_read: Boolean(message.is_read),
read_at: message.read_at ?? null,
created_at: message.created_at,
},
}));
}
},
[updateConversation]
[
hasConversation,
scheduleRefreshConversations,
selectedConversationId,
updateConversation,
]
);
useEffect(() => {
return () => {
if (refreshTimerRef.current) {
clearTimeout(refreshTimerRef.current);
}
};
}, []);
// 建立全局 WebSocket 连接(用于接收全局事件)
useWebSocket<ChatWebSocketPayload>({
conversationId: globalConversationId,
enabled: Boolean(globalConversationId && agentId),
isVisitor: false,
agentId: agentId ?? undefined,
onMessage: handleVisitorStatusUpdate,
wsToken,
onMessage: handleGlobalWebSocketMessage,
onError: (error) => {
// 静默处理错误,避免影响用户体验
},
@@ -25,6 +25,7 @@ import { useWebSocket } from "./useWebSocket";
import { WSMessage } from "@/lib/websocket";
import { buildMessagePreview } from "@/utils/format";
import { playNotificationSound } from "@/utils/sound";
import { getAgentWSToken } from "@/utils/storage";
interface UseMessagesOptions {
conversationId: number | null;
@@ -60,6 +61,7 @@ export function useMessages({
const [aiThinking, setAiThinking] = useState(false);
/** 知识库测试:联网选项 */
const [needWebSearch, setNeedWebSearch] = useState(false);
const wsToken = getAgentWSToken() ?? undefined;
const refreshConversationDetail = useCallback(
async (id: number) => {
@@ -226,6 +228,10 @@ export function useMessages({
return {
...conversation,
updated_at: message.created_at,
// 访客发言视作在线心跳,刷新 last_seen_at,避免在线绿点快速闪断。
last_seen_at: isVisitorMessage
? message.created_at
: conversation.last_seen_at ?? null,
unread_count: nextUnread,
last_message: {
id: message.id,
@@ -513,6 +519,7 @@ export function useMessages({
enabled: Boolean(conversationId),
isVisitor: false, // 客服端设置为 false
agentId: agentId ?? undefined, // 传递客服ID,用于创建系统消息
wsToken,
onMessage: onWebSocketMessage,
onError: (error) => {
// 静默处理错误,避免影响用户体验
@@ -8,6 +8,7 @@ interface UseWebSocketOptions<T> {
enabled?: boolean;
isVisitor?: boolean; // 是否是访客(默认为 true
agentId?: number; // 客服ID(如果是客服连接,需要传递)
wsToken?: string; // 客服 WS 令牌(登录后下发)
onMessage: (payload: WSMessage<T>) => void;
onError?: (error: Event) => void;
onClose?: () => void;
@@ -18,6 +19,7 @@ export function useWebSocket<T>({
enabled = true,
isVisitor = true, // 默认是访客
agentId,
wsToken,
onMessage,
onError,
onClose,
@@ -43,6 +45,7 @@ export function useWebSocket<T>({
conversationId,
isVisitor,
agentId,
wsToken,
// 使用 ref 的 current 值,这样即使回调函数变化也不会导致重新连接
onMessage: (payload) => onMessageRef.current(payload),
onError: onErrorRef.current
@@ -58,6 +61,6 @@ export function useWebSocket<T>({
};
// 只依赖 conversationId、enabled、isVisitor 和 agentId,不依赖回调函数
// 回调函数通过 useRef 存储,不会导致重新连接
}, [conversationId, enabled, isVisitor, agentId]);
}, [conversationId, enabled, isVisitor, agentId, wsToken]);
}
+8 -4
View File
@@ -1,4 +1,4 @@
import { apiUrl } from "@/lib/config";
import { apiUrl, getAgentHeaders } from "@/lib/config";
// FAQ 摘要信息
export interface FAQSummary {
@@ -35,10 +35,12 @@ export async function fetchFAQs(query?: string): Promise<FAQSummary[]> {
const res = await fetch(url, {
cache: "no-store",
headers: getAgentHeaders(),
});
if (!res.ok) {
throw new Error("获取 FAQ 列表失败");
const error = await res.json().catch(() => ({}));
throw new Error((error as { error?: string }).error || "获取 FAQ 列表失败");
}
const data = await res.json();
@@ -49,6 +51,7 @@ export async function fetchFAQs(query?: string): Promise<FAQSummary[]> {
export async function fetchFAQ(id: number): Promise<FAQSummary> {
const res = await fetch(apiUrl(`/faqs/${id}`), {
cache: "no-store",
headers: getAgentHeaders(),
});
if (!res.ok) {
@@ -65,7 +68,7 @@ export async function fetchFAQ(id: number): Promise<FAQSummary> {
export async function createFAQ(data: CreateFAQRequest): Promise<FAQSummary> {
const res = await fetch(apiUrl("/faqs"), {
method: "POST",
headers: { "Content-Type": "application/json" },
headers: { "Content-Type": "application/json", ...getAgentHeaders() },
body: JSON.stringify(data),
});
@@ -84,7 +87,7 @@ export async function updateFAQ(
): Promise<FAQSummary> {
const res = await fetch(apiUrl(`/faqs/${id}`), {
method: "PUT",
headers: { "Content-Type": "application/json" },
headers: { "Content-Type": "application/json", ...getAgentHeaders() },
body: JSON.stringify(data),
});
@@ -103,6 +106,7 @@ export async function updateFAQ(
export async function deleteFAQ(id: number): Promise<void> {
const res = await fetch(apiUrl(`/faqs/${id}`), {
method: "DELETE",
headers: getAgentHeaders(),
});
if (!res.ok) {
@@ -115,6 +115,7 @@ export async function updateKnowledgeBase(
export async function deleteKnowledgeBase(id: number): Promise<void> {
const res = await fetch(apiUrl(`/knowledge-bases/${id}`), {
method: "DELETE",
headers: getAgentHeaders(),
});
if (!res.ok) {
@@ -1,4 +1,4 @@
import { apiUrl } from "@/lib/config";
import { apiUrl, getAgentHeaders } from "@/lib/config";
import { MessageItem } from "../types";
import { reportFrontendLog } from "./systemLogApi";
@@ -172,7 +172,7 @@ export async function sendMessage({
const res = await fetch(apiUrl("/messages"), {
method: "POST",
headers: { "Content-Type": "application/json" },
headers: { "Content-Type": "application/json", ...getAgentHeaders() },
body: JSON.stringify(payload),
});
if (!res.ok) {
@@ -18,7 +18,8 @@ export async function fetchPrompts(userId: number): Promise<PromptItem[]> {
headers: getAgentHeaders(),
});
if (!res.ok) {
throw new Error("获取提示词配置失败");
const err = await res.json().catch(() => ({}));
throw new Error((err as { error?: string }).error || "获取提示词配置失败");
}
const contentType = res.headers.get("content-type") ?? "";
if (!contentType.includes("application/json")) {
+8 -1
View File
@@ -145,7 +145,7 @@ export async function updateUser(
export async function deleteUser(
id: number,
currentUserId: number
): Promise<void> {
): Promise<{ transferredAIConfigs: number }> {
const res = await fetch(
`${apiUrl(`/admin/users/${id}`)}?current_user_id=${currentUserId}`,
{
@@ -162,6 +162,13 @@ export async function deleteUser(
}
throw new Error(error.error || "删除用户失败");
}
const data = await res.json().catch(() => ({}));
return {
transferredAIConfigs:
typeof data.transferred_ai_configs === "number"
? data.transferred_ai_configs
: 0,
};
}
// 更新用户密码
+22 -14
View File
@@ -13,6 +13,7 @@ export interface WSOptions<T = unknown> {
conversationId: number; // 对话ID
isVisitor?: boolean; // 是否是访客(默认为 true
agentId?: number; // 客服ID(如果是客服连接,需要传递)
wsToken?: string; // 客服 WS 令牌(登录后下发)
onMessage?: (message: WSMessage<T>) => void; // 收到消息时的回调
onError?: (error: Event) => void; // 连接错误时的回调
onClose?: () => void; // 连接关闭时的回调
@@ -24,18 +25,22 @@ export class WSClient<T = unknown> {
private conversationId: number;
private isVisitor: boolean;
private agentId?: number; // 客服ID
private wsToken?: string; // 客服 WS 令牌
private onMessage?: (message: WSMessage<T>) => void;
private onError?: (error: Event) => void;
private onClose?: () => void;
private reconnectTimer: NodeJS.Timeout | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 3000; // 3
private reconnectDelay = 3000; // 初始 3 秒
private maxReconnectDelay = 30000; // 最长 30
private manualDisconnect = false;
private logPrefix = "❌ WebSocket 错误";
constructor(options: WSOptions<T>) {
this.conversationId = options.conversationId;
this.isVisitor = options.isVisitor !== undefined ? options.isVisitor : true;
this.agentId = options.agentId;
this.wsToken = options.wsToken;
this.onMessage = options.onMessage;
this.onError = options.onError;
this.onClose = options.onClose;
@@ -43,6 +48,7 @@ export class WSClient<T = unknown> {
// 连接 WebSocket
connect() {
this.manualDisconnect = false;
// 如果已经连接,先断开
if (this.ws && this.ws.readyState !== WebSocket.CLOSED) {
this.ws.close();
@@ -57,6 +63,9 @@ export class WSClient<T = unknown> {
// 如果是客服连接,添加 agent_id 参数
if (!this.isVisitor && this.agentId) {
wsUrl += `&agent_id=${this.agentId}`;
if (this.wsToken) {
wsUrl += `&ws_token=${encodeURIComponent(this.wsToken)}`;
}
}
try {
@@ -64,6 +73,7 @@ export class WSClient<T = unknown> {
this.ws.onopen = () => {
this.reconnectAttempts = 0; // 重置重连次数
this.reconnectDelay = 3000; // 连接恢复后重置退避时间
};
this.ws.onmessage = (event) => {
@@ -82,6 +92,10 @@ export class WSClient<T = unknown> {
this.ws.onerror = (error) => {
const state = this.ws?.readyState;
// 主动断开或连接已进入关闭态时,浏览器仍可能触发 onerror,这属于预期行为,避免误报。
if (this.manualDisconnect || state === WebSocket.CLOSING || state === WebSocket.CLOSED) {
return;
}
const stateText =
state === WebSocket.CONNECTING
? "连接中"
@@ -94,7 +108,7 @@ export class WSClient<T = unknown> {
: "未知";
const url = this.ws?.url || wsUrl;
console.error(
`❌ WebSocket 错误: 对话ID=${this.conversationId}, 状态=${stateText}, URL=${url}`,
`${this.logPrefix}: 对话ID=${this.conversationId}, 状态=${stateText}, URL=${url}`,
error
);
if (this.onError) {
@@ -107,10 +121,8 @@ export class WSClient<T = unknown> {
if (this.onClose) {
this.onClose();
}
// 只有在非正常关闭时才尝试重连(避免在开发模式下频繁重连)
const code = event.code;
const wasClean = event.wasClean;
if (!wasClean && code !== 1000) {
// 除主动断开外,任意关闭都重连(代理空闲断开通常是 clean close)。
if (!this.manualDisconnect) {
this.attemptReconnect();
}
};
@@ -129,19 +141,17 @@ export class WSClient<T = unknown> {
// 尝试重连
private attemptReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error(`❌ WebSocket 重连次数已达上限,停止重连: 对话ID=${this.conversationId}`);
return;
}
this.reconnectAttempts++;
this.reconnectTimer = setTimeout(() => {
this.connect();
}, this.reconnectDelay);
// 指数退避,避免网络波动时过于频繁重连
this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay);
}
// 断开连接
disconnect() {
this.manualDisconnect = true;
// 取消重连
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
@@ -150,8 +160,6 @@ export class WSClient<T = unknown> {
// 关闭 WebSocket 连接
if (this.ws) {
// 设置标志,避免重连
this.reconnectAttempts = this.maxReconnectAttempts;
// 关闭连接
if (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING) {
this.ws.close();
+4 -3
View File
@@ -61,7 +61,8 @@ export function buildMessagePreview(content: string, maxLength = 50): string {
}
// 判断访客是否在线(根据 last_seen_at 字段)
// 如果 last_seen_at 在最近 10 秒内,则认为在线
// 说明:10 秒阈值在公网环境(代理、弱网、移动端切后台)容易抖动,体验上会“刚说完就离线”。
// 这里放宽到 90 秒,减少误判闪断。
export function isVisitorOnline(lastSeenAt: string | null | undefined): boolean {
if (!lastSeenAt) {
return false;
@@ -72,7 +73,7 @@ export function isVisitorOnline(lastSeenAt: string | null | undefined): boolean
}
const now = new Date();
const diff = now.getTime() - lastSeen.getTime();
// 10 秒内认为在线
return diff < 10 * 1000;
// 90 秒内认为在线
return diff < 90 * 1000;
}
+23
View File
@@ -3,6 +3,8 @@ import { AgentUser } from "@/features/agent/types";
const AGENT_ID_KEY = "agent_user_id";
const AGENT_USERNAME_KEY = "agent_username";
const AGENT_ROLE_KEY = "agent_role";
const AGENT_WS_TOKEN_KEY = "agent_ws_token";
const AGENT_WS_TOKEN_EXP_KEY = "agent_ws_token_exp";
const isBrowser = () => typeof window !== "undefined";
@@ -61,5 +63,26 @@ export function clearAgentUser(): void {
window.localStorage.removeItem(AGENT_ID_KEY);
window.localStorage.removeItem(AGENT_USERNAME_KEY);
window.localStorage.removeItem(AGENT_ROLE_KEY);
window.localStorage.removeItem("agent_permissions");
window.localStorage.removeItem(AGENT_WS_TOKEN_KEY);
window.localStorage.removeItem(AGENT_WS_TOKEN_EXP_KEY);
}
export function getAgentWSToken(): string | null {
if (!isBrowser()) return null;
const token = window.localStorage.getItem(AGENT_WS_TOKEN_KEY);
const expRaw = window.localStorage.getItem(AGENT_WS_TOKEN_EXP_KEY);
if (!token || !expRaw) return null;
const exp = Number.parseInt(expRaw, 10);
if (Number.isNaN(exp)) return null;
const nowSec = Math.floor(Date.now() / 1000);
if (exp <= nowSec) return null;
return token;
}
export function setAgentWSToken(token: string, expireAtUnix: number): void {
if (!isBrowser()) return;
window.localStorage.setItem(AGENT_WS_TOKEN_KEY, token);
window.localStorage.setItem(AGENT_WS_TOKEN_EXP_KEY, String(expireAtUnix));
}