submit_api_chat_logic.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. package chatrecords
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/gofrs/uuid/v5"
  9. "io"
  10. "net/http"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "wechat-api/ent/employee"
  15. "wechat-api/ent/wxcard"
  16. "wechat-api/ent/wxcarduser"
  17. "wechat-api/hook/dify"
  18. "wechat-api/hook/fastgpt"
  19. "wechat-api/internal/utils/jwt"
  20. "wechat-api/internal/svc"
  21. "wechat-api/internal/types"
  22. "github.com/zeromicro/go-zero/core/logx"
  23. )
  24. type ChatMessage struct {
  25. Id string `json:"id"`
  26. MessageId string `json:"message_id"`
  27. SessionId uint64 `json:"session_id"`
  28. ConversationId string `json:"conversation_id,optional"`
  29. Answer string `json:"answer"`
  30. Finish bool `json:"finish"`
  31. NeedPay bool `json:"need_pay"`
  32. NeedLogin bool `json:"need_login"`
  33. }
  34. type SubmitApiChatLogic struct {
  35. logx.Logger
  36. ctx context.Context
  37. svcCtx *svc.ServiceContext
  38. }
  39. func NewSubmitApiChatLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SubmitApiChatLogic {
  40. return &SubmitApiChatLogic{
  41. Logger: logx.WithContext(ctx),
  42. ctx: ctx,
  43. svcCtx: svcCtx}
  44. }
  45. func (l *SubmitApiChatLogic) SubmitApiChat(req *types.ChatRecordsInfo, w http.ResponseWriter) {
  46. userId := l.ctx.Value("userId").(uint64)
  47. userInfo, err := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).Only(l.ctx)
  48. if err != nil {
  49. return
  50. }
  51. // session_id 字段确保必须有
  52. var sessionId uint64
  53. if req.BotId == nil || *req.BotId == 0 {
  54. return
  55. }
  56. if req.Content == nil || *req.Content == "" {
  57. return
  58. }
  59. if req.SessionId != nil && *req.SessionId > 0 {
  60. sessionId = *req.SessionId
  61. } else {
  62. newSession, err := l.svcCtx.DB.ChatSession.Create().
  63. SetName(*req.Content).
  64. SetUserID(userId).
  65. SetBotID(*req.BotId).
  66. SetBotType(*req.BotType).
  67. Save(l.ctx)
  68. if err != nil {
  69. return
  70. }
  71. sessionId = newSession.ID
  72. }
  73. // 记录下问题
  74. _, err = l.svcCtx.DB.ChatRecords.Create().
  75. SetUserID(userId).
  76. SetSessionID(sessionId).
  77. SetBotType(*req.BotType).
  78. SetBotID(*req.BotId).
  79. SetContentType(1).
  80. SetContent(*req.Content).
  81. Save(l.ctx)
  82. if err != nil {
  83. return
  84. }
  85. if *req.BotType == 2 { // 从FastGPT里获取回答
  86. card, err := l.svcCtx.DB.WxCard.Query().Where(wxcard.ID(*req.BotId)).Only(l.ctx)
  87. if err != nil {
  88. return
  89. }
  90. fastgptSendChat(l, w, *req.Content, card.APIBase, card.APIKey, sessionId, userId, *req.BotId, *req.BotType)
  91. } else if *req.BotType == 3 { // 从数字员工里获取回答
  92. employeeRow, err := l.svcCtx.DB.Employee.Query().Where(employee.ID(*req.BotId)).Only(l.ctx)
  93. if err != nil {
  94. return
  95. }
  96. //TODO 判断用户是否能与智能体聊天(智能体VIP, 要求用户也必须是VIP)
  97. if employeeRow.IsVip > 0 && userInfo.IsVip == 0 {
  98. return
  99. }
  100. difySendChat(l, w, *req.Content, employeeRow.APIBase, employeeRow.APIKey, sessionId, userId, *req.BotId, *req.BotType)
  101. }
  102. }
  103. // fastgptSendChat 往 FastGPT 发送内容并获得响应信息
  104. func fastgptSendChat(l *SubmitApiChatLogic, w http.ResponseWriter, content, apiBase, apiKey string, sessionId, userId, botId uint64, botType uint8) {
  105. userInfo, _ := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).First(l.ctx)
  106. var chatReq fastgpt.ChatReq
  107. chatReq.Stream = true
  108. message := make([]fastgpt.Message, 0, 1)
  109. message = append(message, fastgpt.Message{
  110. Content: content,
  111. Role: "user",
  112. })
  113. chatReq.Messages = message
  114. chatReq.ChatId = jwt.HashidsEncode(int(sessionId))
  115. chatReq.Variables = fastgpt.Variables{
  116. Uid: jwt.HashidsEncode(int(userId)),
  117. Name: userInfo.Nickname,
  118. }
  119. // 設置請求體 (這裡是一個簡單的範例,你可以根據需要調整)
  120. jsonData, _ := json.Marshal(chatReq)
  121. fmt.Printf("request data:%v\n", string(jsonData))
  122. // 建立HTTP請求
  123. url := apiBase + fastgpt.GetChatUrl()
  124. request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  125. if err != nil {
  126. fmt.Printf("Error creating request:%v", err)
  127. return
  128. }
  129. // 設置請求頭
  130. request.Header.Set("Content-Type", "application/json")
  131. request.Header.Set("Authorization", "Bearer "+apiKey)
  132. request.Header.Set("Transfer-Encoding", "chunked")
  133. // 發送請求
  134. client := &http.Client{
  135. Timeout: time.Second * 60,
  136. }
  137. response, err := client.Do(request)
  138. defer response.Body.Close()
  139. // 讀取響應
  140. reader := bufio.NewReader(response.Body)
  141. w.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  142. w.Header().Set("Connection", "keep-alive")
  143. w.Header().Set("Cache-Control", "no-cache")
  144. flusher, ok := w.(http.Flusher)
  145. if !ok {
  146. http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
  147. return
  148. }
  149. var answer string
  150. for {
  151. line, err := reader.ReadString('\n')
  152. line = strings.Trim(line, " \n")
  153. //fmt.Printf("line = %v\n", line)
  154. if err == io.EOF {
  155. break
  156. }
  157. if len(line) > 6 {
  158. line = line[6:]
  159. chatData := fastgpt.ChatResp{}
  160. err = json.Unmarshal([]byte(line), &chatData)
  161. if err != nil {
  162. fmt.Printf("json unmarshall error:%v\n", err)
  163. break
  164. }
  165. var finish bool
  166. if len(chatData.Choices) == 1 && chatData.Choices[0].FinishReason == "stop" {
  167. finish = true
  168. }
  169. uuidV4, _ := uuid.NewV4() //唯一ID
  170. jsonData := ChatMessage{}
  171. jsonData.Id = chatData.Id
  172. jsonData.SessionId = sessionId
  173. jsonData.Answer = chatData.Choices[0].Delta.Content
  174. jsonData.MessageId = uuidV4.String()
  175. jsonData.Finish = finish
  176. lineData, _ := json.Marshal(jsonData)
  177. // 拼接回答
  178. answer = answer + jsonData.Answer
  179. _, err = fmt.Fprintf(w, "%s", "data: "+string(lineData)+"\r\n")
  180. //fmt.Printf("response=%v\n", string(lineData))
  181. if err != nil {
  182. logAnswer(l, userId, sessionId, botId, botType, answer)
  183. fmt.Printf("Error writing to client:%v \n", err)
  184. break
  185. }
  186. flusher.Flush()
  187. if finish {
  188. logAnswer(l, userId, sessionId, botId, botType, answer)
  189. break
  190. }
  191. }
  192. }
  193. }
  194. // difySendChat 往 Dify 发送内容并获得响应信息
  195. func difySendChat(l *SubmitApiChatLogic, w http.ResponseWriter, content, apiBase, apiKey string, sessionId, userId, botId uint64, botType uint8) {
  196. userInfo, _ := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).First(l.ctx)
  197. var chatReq dify.ChatReq
  198. chatReq.ResponseMode = "streaming"
  199. chatReq.Query = content
  200. chatReq.User = fmt.Sprintf("%d:%s", userId, userInfo.Nickname)
  201. // 这里 sessionId 要与某个 conversation_id 关联,否则查询结果不准
  202. rdsKeySessionId := strconv.Itoa(int(sessionId))
  203. rdsValue := l.svcCtx.Rds.HGet(l.ctx, "miniprogram_dify_conversation_keys", rdsKeySessionId).Val()
  204. chatReq.ConversationId = rdsValue
  205. // 設置請求體 (這裡是一個簡單的範例,你可以根據需要調整)
  206. jsonData, _ := json.Marshal(chatReq)
  207. fmt.Printf("request data:%v\n", string(jsonData))
  208. // 建立HTTP請求
  209. url := apiBase + dify.GetChatUrl() // 替換為正確的FastGPT API端點
  210. request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  211. if err != nil {
  212. fmt.Printf("Error creating request:%v", err)
  213. return
  214. }
  215. // 設置請求頭
  216. request.Header.Set("Content-Type", "application/json")
  217. request.Header.Set("Authorization", "Bearer "+apiKey)
  218. request.Header.Set("Transfer-Encoding", "chunked")
  219. // 發送請求
  220. client := &http.Client{
  221. Timeout: time.Second * 60,
  222. }
  223. response, err := client.Do(request)
  224. defer response.Body.Close()
  225. // 讀取響應
  226. reader := bufio.NewReader(response.Body)
  227. w.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  228. w.Header().Set("Connection", "keep-alive")
  229. w.Header().Set("Cache-Control", "no-cache")
  230. flusher, ok := w.(http.Flusher)
  231. if !ok {
  232. http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
  233. return
  234. }
  235. var answer string
  236. for {
  237. line, err := reader.ReadString('\n')
  238. line = strings.Trim(line, " \n")
  239. //fmt.Printf("line = %v\n", line)
  240. if err == io.EOF {
  241. break
  242. }
  243. if len(line) > 6 {
  244. line = line[6:]
  245. chatData := dify.ChatResp{}
  246. err = json.Unmarshal([]byte(line), &chatData)
  247. if err != nil {
  248. fmt.Printf("json unmarshall error:%v\n", err)
  249. fmt.Printf("line:%v\n", line)
  250. break
  251. }
  252. var finish bool
  253. if chatData.Event == "message_end" {
  254. finish = true
  255. }
  256. // 将 ConversationId 与 sessionId 建立关联关系
  257. if chatData.ConversationId != "" {
  258. l.svcCtx.Rds.HSet(l.ctx, "miniprogram_dify_conversation_keys", rdsKeySessionId, chatData.ConversationId)
  259. }
  260. jsonData := ChatMessage{}
  261. jsonData.Id = chatData.Id
  262. jsonData.SessionId = sessionId
  263. jsonData.Answer = chatData.Answer
  264. jsonData.MessageId = chatData.MessageId
  265. jsonData.Finish = finish
  266. jsonData.ConversationId = chatData.ConversationId
  267. lineData, _ := json.Marshal(jsonData)
  268. // 拼接回答
  269. answer = answer + jsonData.Answer
  270. _, err = fmt.Fprintf(w, "%s", "data: "+string(lineData)+"\r\n")
  271. //fmt.Printf("response=%v\n", string(lineData))
  272. if err != nil {
  273. logAnswer(l, userId, sessionId, botId, botType, answer)
  274. fmt.Printf("Error writing to client:%v \n", err)
  275. break
  276. }
  277. flusher.Flush()
  278. if finish {
  279. logAnswer(l, userId, sessionId, botId, botType, answer)
  280. break
  281. }
  282. }
  283. }
  284. }
  285. // logAnswer 保存Ai回答的内容
  286. func logAnswer(l *SubmitApiChatLogic, userId, sessionId, botId uint64, botType uint8, answer string) {
  287. _, err := l.svcCtx.DB.ChatRecords.Create().
  288. SetUserID(userId).
  289. SetSessionID(sessionId).
  290. SetBotType(botType).
  291. SetBotID(botId).
  292. SetContentType(2).
  293. SetContent(answer).
  294. Save(l.ctx)
  295. if err != nil {
  296. fmt.Printf("save answer failed: %v", err)
  297. }
  298. }