wechat.go 6.8 KB


  1. package channel
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "errors"
  6. "github.com/zeromicro/go-zero/core/logx"
  7. "google.golang.org/protobuf/encoding/protojson"
  8. "regexp"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "wechat-api/database/dao/wechat/model"
  13. "wechat-api/internal/pkg/customer_of_im/proto"
  14. "wechat-api/internal/pkg/wechat_ws"
  15. "wechat-api/internal/svc"
  16. "wechat-api/workphone"
  17. )
  18. type WechatChannel struct {
  19. *Channel
  20. wsClient *wechat_ws.WechatWsClient
  21. svcCtx *svc.ServiceContext
  22. }
  23. func NewWechatChannel(ws *wechat_ws.WechatWsClient, svcCtx *svc.ServiceContext) IChannel {
  24. wechatChannel := &WechatChannel{
  25. wsClient: ws,
  26. svcCtx: svcCtx,
  27. Channel: new(Channel),
  28. }
  29. wechatChannel.InitChannel(wechatChannel)
  30. return wechatChannel
  31. }
  32. func (w *WechatChannel) GetWxsCache() map[string]*model.Wx {
  33. cacheKey := "WechatChannel_WxList"
  34. wxs := make(map[string]*model.Wx)
  35. v, exist := w.svcCtx.Cache.Get(cacheKey)
  36. if exist {
  37. value, ok := v.(map[string]*model.Wx)
  38. if ok {
  39. wxs = value
  40. return wxs
  41. }
  42. }
  43. wxDao := w.svcCtx.WechatQ.Wx
  44. wxlist, err := wxDao.Where(wxDao.Ctype.Eq(1)).Find()
  45. if err != nil {
  46. logx.Error("获取微信列表失败:", err)
  47. } else {
  48. for _, wx := range wxlist {
  49. if wx.AgentID != 0 {
  50. wx.APIKey = w.svcCtx.Config.OpenAI.ApiKey
  51. wx.APIBase = w.svcCtx.Config.OpenAI.BaseUrl
  52. }
  53. wxs[wx.Wxid] = wx
  54. }
  55. w.svcCtx.Cache.SetWithExpire(cacheKey, wxs, time.Minute*10)
  56. }
  57. return wxs
  58. }
  59. func (w *WechatChannel) GetContactCache(wxid string) *model.Contact {
  60. cacheKey := "WechatChannel_WxContact_" + wxid
  61. v, exist := w.svcCtx.Cache.Get(cacheKey)
  62. if exist {
  63. value, ok := v.(*model.Contact)
  64. if ok {
  65. return value
  66. }
  67. }
  68. contactDao := w.svcCtx.WechatQ.Contact
  69. contact, err := contactDao.Where(contactDao.Wxid.Eq(wxid)).First()
  70. if err != nil {
  71. logx.Error("获取微信联系人失败:", err)
  72. return nil
  73. } else {
  74. w.svcCtx.Cache.SetWithExpire(cacheKey, contact, time.Hour*24)
  75. return contact
  76. }
  77. }
  78. func (w *WechatChannel) OnMessage(msg *wechat_ws.MsgJsonObject) error {
  79. switch msg.MsgType {
  80. case "FriendTalkNotice":
  81. // 别人发过来的消息
  82. _ = w.FriendTalkHandle(msg.Message)
  83. case "WeChatTalkToFriendNotice":
  84. // 发给别人的消息
  85. _ = w.WeChatTalkToFriendHandle(msg.Message)
  86. default:
  87. //logx.Info("未知消息类型:", msg.MsgType, msg)
  88. }
  89. return nil
  90. }
  91. func (w *WechatChannel) Send(message *ReplyMessage) error {
  92. switch message.Type {
  93. case proto.EnumContentType_Text:
  94. err := w.SendText(message)
  95. if err != nil {
  96. logx.Error("发送文本消息失败:", err)
  97. return err
  98. }
  99. default:
  100. return errors.New("不支持的消息类型")
  101. }
  102. return nil
  103. }
  104. func (w *WechatChannel) SendText(message *ReplyMessage) error {
  105. friendId := message.ToUserId
  106. Remark := ""
  107. content := message.Content
  108. if message.IsGroup {
  109. friendId = message.GroupId
  110. Remark = message.ToUserId
  111. if message.IsAt && message.ToUserNickname != "" {
  112. content = "@" + message.ToUserNickname + " " + content
  113. }
  114. }
  115. encodedString := base64.StdEncoding.EncodeToString([]byte(content))
  116. msg := map[string]interface{}{
  117. "MsgType": "TalkToFriendTask",
  118. "Content": map[string]interface{}{
  119. "WeChatId": message.UserId,
  120. "FriendId": friendId,
  121. "ContentType": workphone.EnumContentType_Text,
  122. "Content": encodedString,
  123. "Remark": Remark,
  124. },
  125. }
  126. transportMessageJSON, err := json.Marshal(msg)
  127. if err != nil {
  128. logx.Error("failed to marshal message:", err)
  129. return err
  130. }
  131. w.wsClient.SendMsgByChan(transportMessageJSON)
  132. return nil
  133. }
  134. func (w *WechatChannel) GenerateRelevantContext(msg *ChatMessage) (*RelevantContext, error) {
  135. wxs := w.GetWxsCache()
  136. wx, exists := wxs[msg.UserId]
  137. if !exists {
  138. return nil, errors.New("未查找对应微信,不予处理")
  139. }
  140. msg.ToUserNickname = wx.Nickname
  141. from := w.GetContactCache(msg.FromUserId)
  142. if from != nil {
  143. msg.FromUserNickname = from.Nickname
  144. }
  145. if msg.GroupId != "" {
  146. group := w.GetContactCache(msg.GroupId)
  147. if group != nil {
  148. msg.GroupName = group.Nickname
  149. }
  150. }
  151. rctx := &RelevantContext{
  152. APIKey: wx.APIKey,
  153. BaseURL: wx.APIBase,
  154. }
  155. if strings.HasPrefix(rctx.APIKey, "fastgpt") {
  156. rctx.IsFastGPT = true
  157. } else {
  158. rctx.IsFastGPT = false
  159. }
  160. return rctx, nil
  161. }
  162. func (w *WechatChannel) TextToChatMessage(msgStr string) (*ChatMessage, error) {
  163. message := &workphone.FriendTalkNoticeMessage{}
  164. err := protojson.Unmarshal([]byte(msgStr), message)
  165. if err != nil {
  166. return nil, err
  167. }
  168. logx.Info("文本消息结构体:", message)
  169. fromUserId := message.GetFriendId()
  170. toUserId := message.GetWeChatId()
  171. groupId := ""
  172. content := string(message.GetContent())
  173. isAt := false
  174. isGroup := false
  175. if strings.HasSuffix(message.GetFriendId(), "@chatroom") {
  176. groupId = message.GetFriendId()
  177. parts := strings.SplitN(content, ":\n", 2)
  178. if len(parts) > 0 {
  179. fromUserId = strings.TrimSpace(parts[0])
  180. if len(parts) > 1 {
  181. content = strings.TrimSpace(parts[1])
  182. // 使用正则表达式清理内容
  183. re := regexp.MustCompile(`@\S+ `)
  184. content = re.ReplaceAllString(content, "")
  185. }
  186. }
  187. isAt = strings.Contains(message.GetExt(), toUserId)
  188. if !isAt {
  189. //logx.Info("群聊消息不是发给当前用户的,无需处理")
  190. return nil, nil
  191. }
  192. isGroup = true
  193. } else {
  194. fromUserId = message.GetFriendId()
  195. toUserId = message.GetWeChatId()
  196. }
  197. msg := &ChatMessage{
  198. UserId: toUserId,
  199. MsgId: strconv.FormatInt(message.GetMsgSvrId(), 10),
  200. CreateTime: time.Unix(message.GetCreateTime(), 0),
  201. Type: proto.EnumContentType_Text,
  202. Content: content,
  203. FromUserId: fromUserId,
  204. FromUserNickname: "",
  205. ToUserId: toUserId,
  206. ToUserNickname: "",
  207. GroupId: groupId,
  208. GroupName: "",
  209. IsGroup: isGroup,
  210. IsAt: isAt,
  211. }
  212. return msg, nil
  213. }
  214. func (w *WechatChannel) FriendTalkHandle(msgStr string) error {
  215. message := &workphone.FriendTalkNoticeMessage{}
  216. err := protojson.Unmarshal([]byte(msgStr), message)
  217. if err != nil {
  218. logx.Error("FriendTalkNotice protojson.Unmarshal err:", err)
  219. return err
  220. }
  221. logx.Info("收到消息:", message)
  222. chatMsg := &ChatMessage{}
  223. switch message.ContentType {
  224. case workphone.EnumContentType_Text:
  225. logx.Info("这是本文消息")
  226. chatMsg, err = w.TextToChatMessage(msgStr)
  227. if err != nil {
  228. logx.Error("FriendTalkNotice TextToChatMessage err:", err)
  229. return err
  230. }
  231. default:
  232. //logx.Info("不支持的消息类型")
  233. return nil
  234. }
  235. if chatMsg == nil {
  236. //logx.Info("chatMsg is nil,无需处理")
  237. return nil
  238. }
  239. chatCtx, err := w.GenerateRelevantContext(chatMsg)
  240. if err != nil {
  241. logx.Info("FriendTalkNotice GenerateRelevantContext err:", err)
  242. return err
  243. }
  244. logx.Info("处理后的 chatMsg:", chatMsg)
  245. w.Produce(w.svcCtx, chatCtx, chatMsg)
  246. return nil
  247. }
  248. func (w *WechatChannel) WeChatTalkToFriendHandle(msgStr string) error {
  249. logx.Info("收到发送的消息:", msgStr)
  250. return nil
  251. }