package chatrecords import ( "bufio" "bytes" "context" "encoding/json" "fmt" "github.com/gofrs/uuid/v5" "io" "net/http" "strconv" "strings" "time" "wechat-api/ent/employee" "wechat-api/ent/wxcard" "wechat-api/ent/wxcarduser" "wechat-api/hook/dify" "wechat-api/hook/fastgpt" "wechat-api/internal/utils/jwt" "wechat-api/internal/svc" "wechat-api/internal/types" "github.com/zeromicro/go-zero/core/logx" ) type ChatMessage struct { Id string `json:"id"` MessageId string `json:"message_id"` SessionId uint64 `json:"session_id"` ConversationId string `json:"conversation_id,optional"` Answer string `json:"answer"` Finish bool `json:"finish"` NeedPay bool `json:"need_pay"` NeedLogin bool `json:"need_login"` } type SubmitApiChatLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewSubmitApiChatLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SubmitApiChatLogic { return &SubmitApiChatLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx} } func (l *SubmitApiChatLogic) SubmitApiChat(req *types.ChatRecordsInfo, w http.ResponseWriter) { userId := l.ctx.Value("userId").(uint64) userInfo, err := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).Only(l.ctx) if err != nil { return } // session_id 字段确保必须有 var sessionId uint64 if req.BotId == nil || *req.BotId == 0 { return } if req.Content == nil || *req.Content == "" { return } if req.SessionId != nil && *req.SessionId > 0 { sessionId = *req.SessionId } else { newSession, err := l.svcCtx.DB.ChatSession.Create(). SetName(*req.Content). SetUserID(userId). SetBotID(*req.BotId). SetBotType(*req.BotType). Save(l.ctx) if err != nil { return } sessionId = newSession.ID } // 记录下问题 _, err = l.svcCtx.DB.ChatRecords.Create(). SetUserID(userId). SetSessionID(sessionId). SetBotType(*req.BotType). SetBotID(*req.BotId). SetContentType(1). SetContent(*req.Content). Save(l.ctx) if err != nil { return } if *req.BotType == 2 { // 从FastGPT里获取回答 card, err := l.svcCtx.DB.WxCard.Query().Where(wxcard.ID(*req.BotId)).Only(l.ctx) if err != nil { return } fastgptSendChat(l, w, *req.Content, card.APIBase, card.APIKey, sessionId, userId, *req.BotId, *req.BotType) } else if *req.BotType == 3 { // 从数字员工里获取回答 employeeRow, err := l.svcCtx.DB.Employee.Query().Where(employee.ID(*req.BotId)).Only(l.ctx) if err != nil { return } //TODO 判断用户是否能与智能体聊天(智能体VIP, 要求用户也必须是VIP) if employeeRow.IsVip > 0 && userInfo.IsVip == 0 { return } difySendChat(l, w, *req.Content, employeeRow.APIBase, employeeRow.APIKey, sessionId, userId, *req.BotId, *req.BotType) } } // fastgptSendChat 往 FastGPT 发送内容并获得响应信息 func fastgptSendChat(l *SubmitApiChatLogic, w http.ResponseWriter, content, apiBase, apiKey string, sessionId, userId, botId uint64, botType uint8) { userInfo, _ := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).First(l.ctx) var chatReq fastgpt.ChatReq chatReq.Stream = true message := make([]fastgpt.Message, 0, 1) message = append(message, fastgpt.Message{ Content: content, Role: "user", }) chatReq.Messages = message chatReq.ChatId = jwt.HashidsEncode(int(sessionId)) chatReq.Variables = fastgpt.Variables{ Uid: jwt.HashidsEncode(int(userId)), Name: userInfo.Nickname, } // 設置請求體 (這裡是一個簡單的範例,你可以根據需要調整) jsonData, _ := json.Marshal(chatReq) fmt.Printf("request data:%v\n", string(jsonData)) // 建立HTTP請求 url := apiBase + fastgpt.GetChatUrl() request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { fmt.Printf("Error creating request:%v", err) return } // 設置請求頭 request.Header.Set("Content-Type", "application/json") request.Header.Set("Authorization", "Bearer "+apiKey) request.Header.Set("Transfer-Encoding", "chunked") // 發送請求 client := &http.Client{ Timeout: time.Second * 60, } response, err := client.Do(request) defer response.Body.Close() // 讀取響應 reader := bufio.NewReader(response.Body) w.Header().Set("Content-Type", "text/event-stream;charset=utf-8") w.Header().Set("Connection", "keep-alive") w.Header().Set("Cache-Control", "no-cache") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) return } var answer string for { line, err := reader.ReadString('\n') line = strings.Trim(line, " \n") //fmt.Printf("line = %v\n", line) if err == io.EOF { break } if len(line) > 6 { line = line[6:] chatData := fastgpt.ChatResp{} err = json.Unmarshal([]byte(line), &chatData) if err != nil { fmt.Printf("json unmarshall error:%v\n", err) break } var finish bool if len(chatData.Choices) == 1 && chatData.Choices[0].FinishReason == "stop" { finish = true } uuidV4, _ := uuid.NewV4() //唯一ID jsonData := ChatMessage{} jsonData.Id = chatData.Id jsonData.SessionId = sessionId jsonData.Answer = chatData.Choices[0].Delta.Content jsonData.MessageId = uuidV4.String() jsonData.Finish = finish lineData, _ := json.Marshal(jsonData) // 拼接回答 answer = answer + jsonData.Answer _, err = fmt.Fprintf(w, "%s", "data: "+string(lineData)+"\r\n") //fmt.Printf("response=%v\n", string(lineData)) if err != nil { logAnswer(l, userId, sessionId, botId, botType, answer) fmt.Printf("Error writing to client:%v \n", err) break } flusher.Flush() if finish { logAnswer(l, userId, sessionId, botId, botType, answer) break } } } } // difySendChat 往 Dify 发送内容并获得响应信息 func difySendChat(l *SubmitApiChatLogic, w http.ResponseWriter, content, apiBase, apiKey string, sessionId, userId, botId uint64, botType uint8) { userInfo, _ := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).First(l.ctx) var chatReq dify.ChatReq chatReq.ResponseMode = "streaming" chatReq.Query = content chatReq.User = fmt.Sprintf("%d:%s", userId, userInfo.Nickname) // 这里 sessionId 要与某个 conversation_id 关联,否则查询结果不准 rdsKeySessionId := strconv.Itoa(int(sessionId)) rdsValue := l.svcCtx.Rds.HGet(l.ctx, "miniprogram_dify_conversation_keys", rdsKeySessionId).Val() chatReq.ConversationId = rdsValue // 設置請求體 (這裡是一個簡單的範例,你可以根據需要調整) jsonData, _ := json.Marshal(chatReq) fmt.Printf("request data:%v\n", string(jsonData)) // 建立HTTP請求 url := apiBase + dify.GetChatUrl() // 替換為正確的FastGPT API端點 request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { fmt.Printf("Error creating request:%v", err) return } // 設置請求頭 request.Header.Set("Content-Type", "application/json") request.Header.Set("Authorization", "Bearer "+apiKey) request.Header.Set("Transfer-Encoding", "chunked") // 發送請求 client := &http.Client{ Timeout: time.Second * 60, } response, err := client.Do(request) defer response.Body.Close() // 讀取響應 reader := bufio.NewReader(response.Body) w.Header().Set("Content-Type", "text/event-stream;charset=utf-8") w.Header().Set("Connection", "keep-alive") w.Header().Set("Cache-Control", "no-cache") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) return } var answer string for { line, err := reader.ReadString('\n') line = strings.Trim(line, " \n") //fmt.Printf("line = %v\n", line) if err == io.EOF { break } if len(line) > 6 { line = line[6:] chatData := dify.ChatResp{} err = json.Unmarshal([]byte(line), &chatData) if err != nil { fmt.Printf("json unmarshall error:%v\n", err) fmt.Printf("line:%v\n", line) break } var finish bool if chatData.Event == "message_end" { finish = true } // 将 ConversationId 与 sessionId 建立关联关系 if chatData.ConversationId != "" { l.svcCtx.Rds.HSet(l.ctx, "miniprogram_dify_conversation_keys", rdsKeySessionId, chatData.ConversationId) } jsonData := ChatMessage{} jsonData.Id = chatData.Id jsonData.SessionId = sessionId jsonData.Answer = chatData.Answer jsonData.MessageId = chatData.MessageId jsonData.Finish = finish jsonData.ConversationId = chatData.ConversationId lineData, _ := json.Marshal(jsonData) // 拼接回答 answer = answer + jsonData.Answer _, err = fmt.Fprintf(w, "%s", "data: "+string(lineData)+"\r\n") //fmt.Printf("response=%v\n", string(lineData)) if err != nil { logAnswer(l, userId, sessionId, botId, botType, answer) fmt.Printf("Error writing to client:%v \n", err) break } flusher.Flush() if finish { logAnswer(l, userId, sessionId, botId, botType, answer) break } } } } // logAnswer 保存Ai回答的内容 func logAnswer(l *SubmitApiChatLogic, userId, sessionId, botId uint64, botType uint8, answer string) { _, err := l.svcCtx.DB.ChatRecords.Create(). SetUserID(userId). SetSessionID(sessionId). SetBotType(botType). SetBotID(botId). SetContentType(2). SetContent(answer). Save(l.ctx) if err != nil { fmt.Printf("save answer failed: %v", err) } }