123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- package chatrecords
- import (
- "bufio"
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "github.com/gofrs/uuid/v5"
- "io"
- "net/http"
- "strconv"
- "strings"
- "time"
- "wechat-api/ent/employee"
- "wechat-api/ent/wxcard"
- "wechat-api/ent/wxcarduser"
- "wechat-api/hook/dify"
- "wechat-api/hook/fastgpt"
- "wechat-api/internal/utils/jwt"
- "wechat-api/internal/svc"
- "wechat-api/internal/types"
- "github.com/zeromicro/go-zero/core/logx"
- )
- type ChatMessage struct {
- Id string `json:"id"`
- MessageId string `json:"message_id"`
- SessionId uint64 `json:"session_id"`
- ConversationId string `json:"conversation_id,optional"`
- Answer string `json:"answer"`
- Finish bool `json:"finish"`
- NeedPay bool `json:"need_pay"`
- NeedLogin bool `json:"need_login"`
- }
- type SubmitApiChatLogic struct {
- logx.Logger
- ctx context.Context
- svcCtx *svc.ServiceContext
- }
- func NewSubmitApiChatLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SubmitApiChatLogic {
- return &SubmitApiChatLogic{
- Logger: logx.WithContext(ctx),
- ctx: ctx,
- svcCtx: svcCtx}
- }
- func (l *SubmitApiChatLogic) SubmitApiChat(req *types.ChatRecordsInfo, w http.ResponseWriter) {
- userId := l.ctx.Value("userId").(uint64)
- userInfo, err := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).Only(l.ctx)
- if err != nil {
- return
- }
- // session_id 字段确保必须有
- var sessionId uint64
- if req.BotId == nil || *req.BotId == 0 {
- return
- }
- if req.Content == nil || *req.Content == "" {
- return
- }
- if req.SessionId != nil && *req.SessionId > 0 {
- sessionId = *req.SessionId
- } else {
- newSession, err := l.svcCtx.DB.ChatSession.Create().
- SetName(*req.Content).
- SetUserID(userId).
- SetBotID(*req.BotId).
- SetBotType(*req.BotType).
- Save(l.ctx)
- if err != nil {
- return
- }
- sessionId = newSession.ID
- }
- // 记录下问题
- _, err = l.svcCtx.DB.ChatRecords.Create().
- SetUserID(userId).
- SetSessionID(sessionId).
- SetBotType(*req.BotType).
- SetBotID(*req.BotId).
- SetContentType(1).
- SetContent(*req.Content).
- Save(l.ctx)
- if err != nil {
- return
- }
- if *req.BotType == 2 { // 从FastGPT里获取回答
- card, err := l.svcCtx.DB.WxCard.Query().Where(wxcard.ID(*req.BotId)).Only(l.ctx)
- if err != nil {
- return
- }
- fastgptSendChat(l, w, *req.Content, card.APIBase, card.APIKey, sessionId, userId, *req.BotId, *req.BotType)
- } else if *req.BotType == 3 { // 从数字员工里获取回答
- employeeRow, err := l.svcCtx.DB.Employee.Query().Where(employee.ID(*req.BotId)).Only(l.ctx)
- if err != nil {
- return
- }
- //TODO 判断用户是否能与智能体聊天(智能体VIP, 要求用户也必须是VIP)
- if employeeRow.IsVip > 0 && userInfo.IsVip == 0 {
- return
- }
- difySendChat(l, w, *req.Content, employeeRow.APIBase, employeeRow.APIKey, sessionId, userId, *req.BotId, *req.BotType)
- }
- }
- // fastgptSendChat 往 FastGPT 发送内容并获得响应信息
- func fastgptSendChat(l *SubmitApiChatLogic, w http.ResponseWriter, content, apiBase, apiKey string, sessionId, userId, botId uint64, botType uint8) {
- userInfo, _ := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).First(l.ctx)
- var chatReq fastgpt.ChatReq
- chatReq.Stream = true
- message := make([]fastgpt.Message, 0, 1)
- message = append(message, fastgpt.Message{
- Content: content,
- Role: "user",
- })
- chatReq.Messages = message
- chatReq.ChatId = jwt.HashidsEncode(int(sessionId))
- chatReq.Variables = fastgpt.Variables{
- Uid: jwt.HashidsEncode(int(userId)),
- Name: userInfo.Nickname,
- }
- // 設置請求體 (這裡是一個簡單的範例,你可以根據需要調整)
- jsonData, _ := json.Marshal(chatReq)
- fmt.Printf("request data:%v\n", string(jsonData))
- // 建立HTTP請求
- url := apiBase + fastgpt.GetChatUrl()
- request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
- if err != nil {
- fmt.Printf("Error creating request:%v", err)
- return
- }
- // 設置請求頭
- request.Header.Set("Content-Type", "application/json")
- request.Header.Set("Authorization", "Bearer "+apiKey)
- request.Header.Set("Transfer-Encoding", "chunked")
- // 發送請求
- client := &http.Client{
- Timeout: time.Second * 60,
- }
- response, err := client.Do(request)
- defer response.Body.Close()
- // 讀取響應
- reader := bufio.NewReader(response.Body)
- w.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
- w.Header().Set("Connection", "keep-alive")
- w.Header().Set("Cache-Control", "no-cache")
- flusher, ok := w.(http.Flusher)
- if !ok {
- http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
- return
- }
- var answer string
- for {
- line, err := reader.ReadString('\n')
- line = strings.Trim(line, " \n")
- //fmt.Printf("line = %v\n", line)
- if err == io.EOF {
- break
- }
- if len(line) > 6 {
- line = line[6:]
- chatData := fastgpt.ChatResp{}
- err = json.Unmarshal([]byte(line), &chatData)
- if err != nil {
- fmt.Printf("json unmarshall error:%v\n", err)
- break
- }
- var finish bool
- if len(chatData.Choices) == 1 && chatData.Choices[0].FinishReason == "stop" {
- finish = true
- }
- uuidV4, _ := uuid.NewV4() //唯一ID
- jsonData := ChatMessage{}
- jsonData.Id = chatData.Id
- jsonData.SessionId = sessionId
- jsonData.Answer = chatData.Choices[0].Delta.Content
- jsonData.MessageId = uuidV4.String()
- jsonData.Finish = finish
- lineData, _ := json.Marshal(jsonData)
- // 拼接回答
- answer = answer + jsonData.Answer
- _, err = fmt.Fprintf(w, "%s", "data: "+string(lineData)+"\r\n")
- //fmt.Printf("response=%v\n", string(lineData))
- if err != nil {
- logAnswer(l, userId, sessionId, botId, botType, answer)
- fmt.Printf("Error writing to client:%v \n", err)
- break
- }
- flusher.Flush()
- if finish {
- logAnswer(l, userId, sessionId, botId, botType, answer)
- break
- }
- }
- }
- }
- // difySendChat 往 Dify 发送内容并获得响应信息
- func difySendChat(l *SubmitApiChatLogic, w http.ResponseWriter, content, apiBase, apiKey string, sessionId, userId, botId uint64, botType uint8) {
- userInfo, _ := l.svcCtx.DB.WxCardUser.Query().Where(wxcarduser.ID(userId)).First(l.ctx)
- var chatReq dify.ChatReq
- chatReq.ResponseMode = "streaming"
- chatReq.Query = content
- chatReq.User = fmt.Sprintf("%d:%s", userId, userInfo.Nickname)
- // 这里 sessionId 要与某个 conversation_id 关联,否则查询结果不准
- rdsKeySessionId := strconv.Itoa(int(sessionId))
- rdsValue := l.svcCtx.Rds.HGet(l.ctx, "miniprogram_dify_conversation_keys", rdsKeySessionId).Val()
- chatReq.ConversationId = rdsValue
- // 設置請求體 (這裡是一個簡單的範例,你可以根據需要調整)
- jsonData, _ := json.Marshal(chatReq)
- fmt.Printf("request data:%v\n", string(jsonData))
- // 建立HTTP請求
- url := apiBase + dify.GetChatUrl() // 替換為正確的FastGPT API端點
- request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
- if err != nil {
- fmt.Printf("Error creating request:%v", err)
- return
- }
- // 設置請求頭
- request.Header.Set("Content-Type", "application/json")
- request.Header.Set("Authorization", "Bearer "+apiKey)
- request.Header.Set("Transfer-Encoding", "chunked")
- // 發送請求
- client := &http.Client{
- Timeout: time.Second * 60,
- }
- response, err := client.Do(request)
- defer response.Body.Close()
- // 讀取響應
- reader := bufio.NewReader(response.Body)
- w.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
- w.Header().Set("Connection", "keep-alive")
- w.Header().Set("Cache-Control", "no-cache")
- flusher, ok := w.(http.Flusher)
- if !ok {
- http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
- return
- }
- var answer string
- for {
- line, err := reader.ReadString('\n')
- line = strings.Trim(line, " \n")
- //fmt.Printf("line = %v\n", line)
- if err == io.EOF {
- break
- }
- if len(line) > 6 {
- line = line[6:]
- chatData := dify.ChatResp{}
- err = json.Unmarshal([]byte(line), &chatData)
- if err != nil {
- fmt.Printf("json unmarshall error:%v\n", err)
- fmt.Printf("line:%v\n", line)
- break
- }
- var finish bool
- if chatData.Event == "message_end" {
- finish = true
- }
- // 将 ConversationId 与 sessionId 建立关联关系
- if chatData.ConversationId != "" {
- l.svcCtx.Rds.HSet(l.ctx, "miniprogram_dify_conversation_keys", rdsKeySessionId, chatData.ConversationId)
- }
- jsonData := ChatMessage{}
- jsonData.Id = chatData.Id
- jsonData.SessionId = sessionId
- jsonData.Answer = chatData.Answer
- jsonData.MessageId = chatData.MessageId
- jsonData.Finish = finish
- jsonData.ConversationId = chatData.ConversationId
- lineData, _ := json.Marshal(jsonData)
- // 拼接回答
- answer = answer + jsonData.Answer
- _, err = fmt.Fprintf(w, "%s", "data: "+string(lineData)+"\r\n")
- //fmt.Printf("response=%v\n", string(lineData))
- if err != nil {
- logAnswer(l, userId, sessionId, botId, botType, answer)
- fmt.Printf("Error writing to client:%v \n", err)
- break
- }
- flusher.Flush()
- if finish {
- logAnswer(l, userId, sessionId, botId, botType, answer)
- break
- }
- }
- }
- }
- // logAnswer 保存Ai回答的内容
- func logAnswer(l *SubmitApiChatLogic, userId, sessionId, botId uint64, botType uint8, answer string) {
- _, err := l.svcCtx.DB.ChatRecords.Create().
- SetUserID(userId).
- SetSessionID(sessionId).
- SetBotType(botType).
- SetBotID(botId).
- SetContentType(2).
- SetContent(answer).
- Save(l.ctx)
- if err != nil {
- fmt.Printf("save answer failed: %v", err)
- }
- }
|