submit_api_chat_logic.go 7.2 KB


  1. package chatrecords
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/gofrs/uuid/v5"
  9. "io"
  10. "net/http"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "wechat-api/ent/wxcarduser"
  15. "wechat-api/hook/dify"
  16. "wechat-api/hook/fastgpt"
  17. "wechat-api/internal/utils/jwt"
  18. "wechat-api/internal/svc"
  19. "wechat-api/internal/types"
  20. "github.com/zeromicro/go-zero/core/logx"
  21. )
  22. type ChatMessage struct {
  23. Id string `json:"id"`
  24. MessageId string `json:"message_id"`
  25. SessionId uint64 `json:"session_id"`
  26. ConversationId string `json:"conversation_id,optional"`
  27. Answer string `json:"answer"`
  28. Finish bool `json:"finish"`
  29. }
  30. type SubmitApiChatLogic struct {
  31. logx.Logger
  32. ctx context.Context
  33. svcCtx *svc.ServiceContext
  34. }
  35. func NewSubmitApiChatLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SubmitApiChatLogic {
  36. return &SubmitApiChatLogic{
  37. Logger: logx.WithContext(ctx),
  38. ctx: ctx,
  39. svcCtx: svcCtx}
  40. }
  41. func (l *SubmitApiChatLogic) SubmitApiChat(req *types.ChatRecordsInfo, w http.ResponseWriter) {
  42. userId := uint64(1) //l.ctx.Value("userId").(uint64)
  43. // session_id 字段确保必须有
  44. var sessionId uint64
  45. if req.BotId == nil || *req.BotId == 0 {
  46. return
  47. }
  48. if req.SessionId != nil && *req.SessionId > 0 {
  49. sessionId = *req.SessionId
  50. } else {
  51. newSession, err := l.svcCtx.DB.ChatSession.Create().
  52. SetName(*req.Content).
  53. SetUserID(userId).
  54. SetBotID(*req.BotId).
  55. SetBotType(*req.BotType).
  56. Save(l.ctx)
  57. if err != nil {
  58. return
  59. }
  60. sessionId = newSession.ID
  61. }
  62. // 记录下问题
  63. _, err := l.svcCtx.DB.ChatRecords.Create().
  64. SetUserID(userId).
  65. SetSessionID(sessionId).
  66. SetBotType(*req.BotType).
  67. SetBotID(*req.BotId).
  68. SetContentType(1).
  69. SetContent(*req.Content).
  70. Save(l.ctx)
  71. if err != nil {
  72. return
  73. }
  74. if *req.BotType == 2 { // 从FastGPT里获取回答
  75. fastgptSendChat(l, w, *req.Content, sessionId, userId)
  76. } else if *req.BotType == 3 { // 从数字员工里获取回答
  77. difySendChat(l, w, *req.Content, sessionId, userId)
  78. }
  79. }
  80. // fastgptSendChat 往 FastGPT 发送内容并获得响应信息
  81. func fastgptSendChat(l *SubmitApiChatLogic, w http.ResponseWriter, content string, sessionId, userId uint64) {
  82. userInfo, _ := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).First(l.ctx)
  83. var chatReq fastgpt.ChatReq
  84. chatReq.Stream = true
  85. message := make([]fastgpt.Message, 0, 1)
  86. message = append(message, fastgpt.Message{
  87. Content: content,
  88. Role: "user",
  89. })
  90. chatReq.Messages = message
  91. chatReq.ChatId = jwt.HashidsEncode(int(sessionId))
  92. chatReq.Variables = fastgpt.Variables{
  93. Uid: jwt.HashidsEncode(int(userId)),
  94. Name: userInfo.Nickname,
  95. }
  96. // 設置請求體 (這裡是一個簡單的範例,你可以根據需要調整)
  97. jsonData, _ := json.Marshal(chatReq)
  98. fmt.Printf("request data:%v\n", string(jsonData))
  99. // 建立HTTP請求
  100. url := fastgpt.GetBaseUrl() + fastgpt.GetChatUrl() // 替換為正確的FastGPT API端點
  101. request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  102. if err != nil {
  103. fmt.Printf("Error creating request:%v", err)
  104. return
  105. }
  106. // 設置請求頭
  107. request.Header.Set("Content-Type", "application/json")
  108. request.Header.Set("Authorization", "Bearer "+fastgpt.GetChatToken())
  109. request.Header.Set("Transfer-Encoding", "chunked")
  110. // 發送請求
  111. client := &http.Client{
  112. Timeout: time.Second * 60,
  113. }
  114. response, err := client.Do(request)
  115. defer response.Body.Close()
  116. // 讀取響應
  117. reader := bufio.NewReader(response.Body)
  118. w.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  119. w.Header().Set("Connection", "keep-alive")
  120. w.Header().Set("Cache-Control", "no-cache")
  121. for {
  122. line, err := reader.ReadString('\n')
  123. line = strings.Trim(line, " \n")
  124. fmt.Printf("line = %v\n", line)
  125. if err == io.EOF {
  126. break
  127. }
  128. if len(line) > 6 {
  129. line = line[6:]
  130. chatData := fastgpt.ChatResp{}
  131. err = json.Unmarshal([]byte(line), &chatData)
  132. if err != nil {
  133. fmt.Printf("json unmarshall error:%v\n", err)
  134. break
  135. }
  136. var finish bool
  137. if len(chatData.Choices) == 1 && chatData.Choices[0].FinishReason == "stop" {
  138. finish = true
  139. }
  140. uuidV4, _ := uuid.NewV4() //唯一ID
  141. jsonData := ChatMessage{}
  142. jsonData.Id = chatData.Id
  143. jsonData.SessionId = sessionId
  144. jsonData.Answer = chatData.Choices[0].Delta.Content
  145. jsonData.MessageId = uuidV4.String()
  146. jsonData.Finish = finish
  147. lineData, _ := json.Marshal(jsonData)
  148. _, _ = fmt.Fprintln(w, "data: "+string(lineData)+"\n")
  149. fmt.Printf("response=%v\n", string(lineData))
  150. if finish {
  151. break
  152. }
  153. }
  154. }
  155. }
  156. // difySendChat 往 Dify 发送内容并获得响应信息
  157. func difySendChat(l *SubmitApiChatLogic, w http.ResponseWriter, content string, sessionId, userId uint64) {
  158. userInfo, _ := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).First(l.ctx)
  159. var chatReq dify.ChatReq
  160. chatReq.ResponseMode = "streaming"
  161. chatReq.Query = content
  162. chatReq.User = fmt.Sprintf("%d:%s", userId, userInfo.Nickname)
  163. // 这里 sessionId 要与某个 conversation_id 关联,否则查询结果不准
  164. rdsKeySessionId := strconv.Itoa(int(sessionId))
  165. rdsValue := l.svcCtx.Rds.HGet(l.ctx, "miniprogram_dify_conversation_keys", rdsKeySessionId).Val()
  166. chatReq.ConversationId = rdsValue
  167. // 設置請求體 (這裡是一個簡單的範例,你可以根據需要調整)
  168. jsonData, _ := json.Marshal(chatReq)
  169. fmt.Printf("request data:%v\n", string(jsonData))
  170. // 建立HTTP請求
  171. url := dify.GetBaseUrl() + dify.GetChatUrl() // 替換為正確的FastGPT API端點
  172. request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  173. if err != nil {
  174. fmt.Printf("Error creating request:%v", err)
  175. return
  176. }
  177. // 設置請求頭
  178. request.Header.Set("Content-Type", "application/json")
  179. request.Header.Set("Authorization", "Bearer "+dify.GetChatToken())
  180. request.Header.Set("Transfer-Encoding", "chunked")
  181. // 發送請求
  182. client := &http.Client{
  183. Timeout: time.Second * 60,
  184. }
  185. response, err := client.Do(request)
  186. defer response.Body.Close()
  187. // 讀取響應
  188. reader := bufio.NewReader(response.Body)
  189. w.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  190. w.Header().Set("Connection", "keep-alive")
  191. w.Header().Set("Cache-Control", "no-cache")
  192. for {
  193. line, err := reader.ReadString('\n')
  194. line = strings.Trim(line, " \n")
  195. fmt.Printf("line = %v\n", line)
  196. if err == io.EOF {
  197. break
  198. }
  199. if len(line) > 6 {
  200. line = line[6:]
  201. chatData := dify.ChatResp{}
  202. err = json.Unmarshal([]byte(line), &chatData)
  203. if err != nil {
  204. fmt.Printf("json unmarshall error:%v\n", err)
  205. fmt.Printf("line:%v\n", line)
  206. break
  207. }
  208. var finish bool
  209. if chatData.Event == "message_end" {
  210. finish = true
  211. }
  212. // 将 ConversationId 与 sessionId 建立关联关系
  213. if chatData.ConversationId != "" {
  214. l.svcCtx.Rds.HSet(l.ctx, "miniprogram_dify_conversation_keys", rdsKeySessionId, chatData.ConversationId)
  215. }
  216. jsonData := ChatMessage{}
  217. jsonData.Id = chatData.Id
  218. jsonData.SessionId = sessionId
  219. jsonData.Answer = chatData.Answer
  220. jsonData.MessageId = chatData.MessageId
  221. jsonData.Finish = finish
  222. jsonData.ConversationId = chatData.ConversationId
  223. lineData, _ := json.Marshal(jsonData)
  224. _, _ = fmt.Fprintln(w, "data: "+string(lineData)+"\n")
  225. fmt.Printf("response=%v\n", string(lineData))
  226. if finish {
  227. break
  228. }
  229. }
  230. }
  231. }