submit_api_chat_logic.go 7.6 KB


  1. package chatrecords
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/gofrs/uuid/v5"
  9. "io"
  10. "net/http"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "wechat-api/ent/employee"
  15. "wechat-api/ent/wxcard"
  16. "wechat-api/ent/wxcarduser"
  17. "wechat-api/hook/dify"
  18. "wechat-api/hook/fastgpt"
  19. "wechat-api/internal/utils/jwt"
  20. "wechat-api/internal/svc"
  21. "wechat-api/internal/types"
  22. "github.com/zeromicro/go-zero/core/logx"
  23. )
  24. type ChatMessage struct {
  25. Id string `json:"id"`
  26. MessageId string `json:"message_id"`
  27. SessionId uint64 `json:"session_id"`
  28. ConversationId string `json:"conversation_id,optional"`
  29. Answer string `json:"answer"`
  30. Finish bool `json:"finish"`
  31. }
  32. type SubmitApiChatLogic struct {
  33. logx.Logger
  34. ctx context.Context
  35. svcCtx *svc.ServiceContext
  36. }
  37. func NewSubmitApiChatLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SubmitApiChatLogic {
  38. return &SubmitApiChatLogic{
  39. Logger: logx.WithContext(ctx),
  40. ctx: ctx,
  41. svcCtx: svcCtx}
  42. }
  43. func (l *SubmitApiChatLogic) SubmitApiChat(req *types.ChatRecordsInfo, w http.ResponseWriter) {
  44. userId := l.ctx.Value("userId").(uint64)
  45. // session_id 字段确保必须有
  46. var sessionId uint64
  47. if req.BotId == nil || *req.BotId == 0 {
  48. return
  49. }
  50. if req.SessionId != nil && *req.SessionId > 0 {
  51. sessionId = *req.SessionId
  52. } else {
  53. newSession, err := l.svcCtx.DB.ChatSession.Create().
  54. SetName(*req.Content).
  55. SetUserID(userId).
  56. SetBotID(*req.BotId).
  57. SetBotType(*req.BotType).
  58. Save(l.ctx)
  59. if err != nil {
  60. return
  61. }
  62. sessionId = newSession.ID
  63. }
  64. // 记录下问题
  65. _, err := l.svcCtx.DB.ChatRecords.Create().
  66. SetUserID(userId).
  67. SetSessionID(sessionId).
  68. SetBotType(*req.BotType).
  69. SetBotID(*req.BotId).
  70. SetContentType(1).
  71. SetContent(*req.Content).
  72. Save(l.ctx)
  73. if err != nil {
  74. return
  75. }
  76. if *req.BotType == 2 { // 从FastGPT里获取回答
  77. card, err := l.svcCtx.DB.WxCard.Query().Where(wxcard.ID(*req.BotId)).Only(l.ctx)
  78. if err != nil {
  79. return //TODO 这里应该报错
  80. }
  81. fastgptSendChat(l, w, *req.Content, card.APIBase, card.APIKey, sessionId, userId)
  82. } else if *req.BotType == 3 { // 从数字员工里获取回答
  83. employee, err := l.svcCtx.DB.Employee.Query().Where(employee.ID(*req.BotId)).Only(l.ctx)
  84. if err != nil {
  85. return //TODO 这里应该报错
  86. }
  87. difySendChat(l, w, *req.Content, employee.APIBase, employee.APIKey, sessionId, userId)
  88. }
  89. }
  90. // fastgptSendChat 往 FastGPT 发送内容并获得响应信息
  91. func fastgptSendChat(l *SubmitApiChatLogic, w http.ResponseWriter, content, apiBase, apiKey string, sessionId, userId uint64) {
  92. userInfo, _ := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).First(l.ctx)
  93. var chatReq fastgpt.ChatReq
  94. chatReq.Stream = true
  95. message := make([]fastgpt.Message, 0, 1)
  96. message = append(message, fastgpt.Message{
  97. Content: content,
  98. Role: "user",
  99. })
  100. chatReq.Messages = message
  101. chatReq.ChatId = jwt.HashidsEncode(int(sessionId))
  102. chatReq.Variables = fastgpt.Variables{
  103. Uid: jwt.HashidsEncode(int(userId)),
  104. Name: userInfo.Nickname,
  105. }
  106. // 設置請求體 (這裡是一個簡單的範例,你可以根據需要調整)
  107. jsonData, _ := json.Marshal(chatReq)
  108. fmt.Printf("request data:%v\n", string(jsonData))
  109. // 建立HTTP請求
  110. url := apiBase + fastgpt.GetChatUrl()
  111. fmt.Printf("url=%v token=%v\n", url, apiKey)
  112. request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  113. if err != nil {
  114. fmt.Printf("Error creating request:%v", err)
  115. return
  116. }
  117. // 設置請求頭
  118. request.Header.Set("Content-Type", "application/json")
  119. request.Header.Set("Authorization", "Bearer "+apiKey)
  120. request.Header.Set("Transfer-Encoding", "chunked")
  121. // 發送請求
  122. client := &http.Client{
  123. Timeout: time.Second * 60,
  124. }
  125. response, err := client.Do(request)
  126. defer response.Body.Close()
  127. // 讀取響應
  128. reader := bufio.NewReader(response.Body)
  129. w.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  130. w.Header().Set("Connection", "keep-alive")
  131. w.Header().Set("Cache-Control", "no-cache")
  132. for {
  133. line, err := reader.ReadString('\n')
  134. line = strings.Trim(line, " \n")
  135. fmt.Printf("line = %v\n", line)
  136. if err == io.EOF {
  137. break
  138. }
  139. if len(line) > 6 {
  140. line = line[6:]
  141. chatData := fastgpt.ChatResp{}
  142. err = json.Unmarshal([]byte(line), &chatData)
  143. if err != nil {
  144. fmt.Printf("json unmarshall error:%v\n", err)
  145. break
  146. }
  147. var finish bool
  148. if len(chatData.Choices) == 1 && chatData.Choices[0].FinishReason == "stop" {
  149. finish = true
  150. }
  151. uuidV4, _ := uuid.NewV4() //唯一ID
  152. jsonData := ChatMessage{}
  153. jsonData.Id = chatData.Id
  154. jsonData.SessionId = sessionId
  155. jsonData.Answer = chatData.Choices[0].Delta.Content
  156. jsonData.MessageId = uuidV4.String()
  157. jsonData.Finish = finish
  158. lineData, _ := json.Marshal(jsonData)
  159. _, _ = fmt.Fprintln(w, "data: "+string(lineData)+"\n")
  160. fmt.Printf("response=%v\n", string(lineData))
  161. if finish {
  162. break
  163. }
  164. }
  165. }
  166. }
  167. // difySendChat 往 Dify 发送内容并获得响应信息
  168. func difySendChat(l *SubmitApiChatLogic, w http.ResponseWriter, content, apiBase, apiKey string, sessionId, userId uint64) {
  169. userInfo, _ := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).First(l.ctx)
  170. var chatReq dify.ChatReq
  171. chatReq.ResponseMode = "streaming"
  172. chatReq.Query = content
  173. chatReq.User = fmt.Sprintf("%d:%s", userId, userInfo.Nickname)
  174. // 这里 sessionId 要与某个 conversation_id 关联,否则查询结果不准
  175. rdsKeySessionId := strconv.Itoa(int(sessionId))
  176. rdsValue := l.svcCtx.Rds.HGet(l.ctx, "miniprogram_dify_conversation_keys", rdsKeySessionId).Val()
  177. chatReq.ConversationId = rdsValue
  178. // 設置請求體 (這裡是一個簡單的範例,你可以根據需要調整)
  179. jsonData, _ := json.Marshal(chatReq)
  180. fmt.Printf("request data:%v\n", string(jsonData))
  181. // 建立HTTP請求
  182. url := apiBase + dify.GetChatUrl() // 替換為正確的FastGPT API端點
  183. request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  184. if err != nil {
  185. fmt.Printf("Error creating request:%v", err)
  186. return
  187. }
  188. // 設置請求頭
  189. request.Header.Set("Content-Type", "application/json")
  190. request.Header.Set("Authorization", "Bearer "+apiKey)
  191. request.Header.Set("Transfer-Encoding", "chunked")
  192. // 發送請求
  193. client := &http.Client{
  194. Timeout: time.Second * 60,
  195. }
  196. response, err := client.Do(request)
  197. defer response.Body.Close()
  198. // 讀取響應
  199. reader := bufio.NewReader(response.Body)
  200. w.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  201. w.Header().Set("Connection", "keep-alive")
  202. w.Header().Set("Cache-Control", "no-cache")
  203. for {
  204. line, err := reader.ReadString('\n')
  205. line = strings.Trim(line, " \n")
  206. fmt.Printf("line = %v\n", line)
  207. if err == io.EOF {
  208. break
  209. }
  210. if len(line) > 6 {
  211. line = line[6:]
  212. chatData := dify.ChatResp{}
  213. err = json.Unmarshal([]byte(line), &chatData)
  214. if err != nil {
  215. fmt.Printf("json unmarshall error:%v\n", err)
  216. fmt.Printf("line:%v\n", line)
  217. break
  218. }
  219. var finish bool
  220. if chatData.Event == "message_end" {
  221. finish = true
  222. }
  223. // 将 ConversationId 与 sessionId 建立关联关系
  224. if chatData.ConversationId != "" {
  225. l.svcCtx.Rds.HSet(l.ctx, "miniprogram_dify_conversation_keys", rdsKeySessionId, chatData.ConversationId)
  226. }
  227. jsonData := ChatMessage{}
  228. jsonData.Id = chatData.Id
  229. jsonData.SessionId = sessionId
  230. jsonData.Answer = chatData.Answer
  231. jsonData.MessageId = chatData.MessageId
  232. jsonData.Finish = finish
  233. jsonData.ConversationId = chatData.ConversationId
  234. lineData, _ := json.Marshal(jsonData)
  235. _, _ = fmt.Fprintln(w, "data: "+string(lineData)+"\n")
  236. fmt.Printf("response=%v\n", string(lineData))
  237. if finish {
  238. break
  239. }
  240. }
  241. }
  242. }