wechat.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. package channel
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "errors"
  6. "github.com/zeromicro/go-zero/core/collection"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "google.golang.org/protobuf/encoding/protojson"
  9. "regexp"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "wechat-api/database/dao/wechat/model"
  14. "wechat-api/internal/pkg/customer_of_im/proto"
  15. "wechat-api/internal/pkg/wechat_ws"
  16. "wechat-api/internal/svc"
  17. "wechat-api/workphone"
  18. )
  19. type WechatChannel struct {
  20. *Channel
  21. wsClient *wechat_ws.WechatWsClient
  22. svcCtx *svc.ServiceContext
  23. }
  24. func NewWechatChannel(ws *wechat_ws.WechatWsClient, svcCtx *svc.ServiceContext) IChannel {
  25. wechatChannel := &WechatChannel{
  26. wsClient: ws,
  27. svcCtx: svcCtx,
  28. Channel: new(Channel),
  29. }
  30. wechatChannel.InitChannel(wechatChannel)
  31. return wechatChannel
  32. }
  33. func (w *WechatChannel) GetWxsCache() *collection.SafeMap {
  34. cacheKey := "WechatChannel_WxList"
  35. wxs := collection.NewSafeMap()
  36. v, exist := w.svcCtx.Cache.Get(cacheKey)
  37. if exist {
  38. value, ok := v.(*collection.SafeMap)
  39. if ok {
  40. wxs = value
  41. return wxs
  42. }
  43. }
  44. wxDao := w.svcCtx.WechatQ.Wx
  45. wxlist, err := wxDao.Where(wxDao.Ctype.Eq(1)).Find()
  46. if err != nil {
  47. logx.Error("获取微信列表失败:", err)
  48. } else {
  49. for _, wx := range wxlist {
  50. if wx.AgentID != 0 {
  51. wx.APIKey = w.svcCtx.Config.OpenAI.ApiKey
  52. wx.APIBase = w.svcCtx.Config.OpenAI.BaseUrl
  53. }
  54. wxs.Set(wx.Wxid, wx)
  55. }
  56. w.svcCtx.Cache.SetWithExpire(cacheKey, wxs, time.Minute*10)
  57. }
  58. return wxs
  59. }
  60. func (w *WechatChannel) GetWxInfo(wxid string) (*model.Wx, error) {
  61. wxs := w.GetWxsCache()
  62. v, exists := wxs.Get(wxid)
  63. if !exists {
  64. return nil, errors.New("未查找对应微信,不予处理")
  65. }
  66. wx, ok := v.(*model.Wx)
  67. if !ok {
  68. return nil, errors.New("微信信息转换失败")
  69. }
  70. return wx, nil
  71. }
  72. func (w *WechatChannel) GetContactCache(wxid string) (*model.Contact, error) {
  73. cacheKey := "WechatChannel_WxContact_" + wxid
  74. v, exist := w.svcCtx.Cache.Get(cacheKey)
  75. if exist {
  76. value, ok := v.(*model.Contact)
  77. if ok {
  78. return value, nil
  79. }
  80. }
  81. contactDao := w.svcCtx.WechatQ.Contact
  82. contact, err := contactDao.Where(contactDao.Wxid.Eq(wxid)).First()
  83. if err != nil {
  84. logx.Error("获取微信联系人失败:", err)
  85. return nil, err
  86. } else {
  87. w.svcCtx.Cache.SetWithExpire(cacheKey, contact, time.Hour*24)
  88. return contact, nil
  89. }
  90. }
  91. func (w *WechatChannel) OnMessage(msg *wechat_ws.MsgJsonObject) error {
  92. switch msg.MsgType {
  93. case "FriendTalkNotice":
  94. // 别人发过来的消息
  95. _ = w.FriendTalkHandle(msg.Message)
  96. case "WeChatTalkToFriendNotice":
  97. // 发给别人的消息
  98. _ = w.WeChatTalkToFriendHandle(msg.Message)
  99. default:
  100. //logx.Info("未知消息类型:", msg.MsgType, msg)
  101. }
  102. return nil
  103. }
  104. func (w *WechatChannel) Send(message *ReplyMessage) error {
  105. switch message.Type {
  106. case proto.EnumContentType_Text:
  107. err := w.SendText(message)
  108. if err != nil {
  109. logx.Error("发送文本消息失败:", err)
  110. return err
  111. }
  112. default:
  113. return errors.New("不支持的消息类型")
  114. }
  115. return nil
  116. }
  117. func (w *WechatChannel) SendText(message *ReplyMessage) error {
  118. friendId := message.ToUserId
  119. Remark := ""
  120. content := message.Content
  121. if message.IsGroup {
  122. friendId = message.GroupId
  123. Remark = message.ToUserId
  124. if message.IsAt && message.ToUserNickname != "" {
  125. content = "@" + message.ToUserNickname + " " + content
  126. }
  127. }
  128. encodedString := base64.StdEncoding.EncodeToString([]byte(content))
  129. msg := map[string]interface{}{
  130. "MsgType": "TalkToFriendTask",
  131. "Content": map[string]interface{}{
  132. "WeChatId": message.UserId,
  133. "FriendId": friendId,
  134. "ContentType": workphone.EnumContentType_Text,
  135. "Content": encodedString,
  136. "Remark": Remark,
  137. },
  138. }
  139. transportMessageJSON, err := json.Marshal(msg)
  140. if err != nil {
  141. logx.Error("failed to marshal message:", err)
  142. return err
  143. }
  144. w.wsClient.SendMsgByChan(transportMessageJSON)
  145. return nil
  146. }
  147. func (w *WechatChannel) GenerateRelevantContext(msg *ChatMessage) (*RelevantContext, error) {
  148. wx, err := w.GetWxInfo(msg.UserId)
  149. if err != nil {
  150. logx.Error("获取微信信息失败:", err)
  151. return nil, err
  152. }
  153. msg.ToUserNickname = wx.Nickname
  154. from, err := w.GetContactCache(msg.FromUserId)
  155. if err == nil {
  156. msg.FromUserNickname = from.Nickname
  157. }
  158. if msg.GroupId != "" {
  159. group, err := w.GetContactCache(msg.GroupId)
  160. if err == nil {
  161. msg.GroupName = group.Nickname
  162. }
  163. }
  164. rctx := &RelevantContext{
  165. APIKey: wx.APIKey,
  166. BaseURL: wx.APIBase,
  167. }
  168. if strings.HasPrefix(rctx.APIKey, "fastgpt") {
  169. rctx.IsFastGPT = true
  170. } else {
  171. rctx.IsFastGPT = false
  172. }
  173. return rctx, nil
  174. }
  175. func (w *WechatChannel) TextToChatMessage(msgStr string) (*ChatMessage, error) {
  176. message := &workphone.FriendTalkNoticeMessage{}
  177. err := protojson.Unmarshal([]byte(msgStr), message)
  178. if err != nil {
  179. return nil, err
  180. }
  181. logx.Info("文本消息结构体:", message)
  182. fromUserId := message.GetFriendId()
  183. toUserId := message.GetWeChatId()
  184. groupId := ""
  185. content := string(message.GetContent())
  186. isAt := false
  187. isGroup := false
  188. if strings.HasSuffix(message.GetFriendId(), "@chatroom") {
  189. groupId = message.GetFriendId()
  190. parts := strings.SplitN(content, ":\n", 2)
  191. if len(parts) > 0 {
  192. fromUserId = strings.TrimSpace(parts[0])
  193. if len(parts) > 1 {
  194. content = strings.TrimSpace(parts[1])
  195. // 使用正则表达式清理内容
  196. re := regexp.MustCompile(`@\S+ `)
  197. content = re.ReplaceAllString(content, "")
  198. }
  199. }
  200. isAt = strings.Contains(message.GetExt(), toUserId)
  201. if !isAt {
  202. //logx.Info("群聊消息不是发给当前用户的,无需处理")
  203. return nil, nil
  204. }
  205. isGroup = true
  206. } else {
  207. fromUserId = message.GetFriendId()
  208. toUserId = message.GetWeChatId()
  209. }
  210. msg := &ChatMessage{
  211. UserId: toUserId,
  212. MsgId: strconv.FormatInt(message.GetMsgSvrId(), 10),
  213. CreateTime: time.Unix(message.GetCreateTime(), 0),
  214. Type: proto.EnumContentType_Text,
  215. Content: content,
  216. FromUserId: fromUserId,
  217. FromUserNickname: "",
  218. ToUserId: toUserId,
  219. ToUserNickname: "",
  220. GroupId: groupId,
  221. GroupName: "",
  222. IsGroup: isGroup,
  223. IsAt: isAt,
  224. }
  225. return msg, nil
  226. }
  227. func (w *WechatChannel) FriendTalkHandle(msgStr string) error {
  228. message := &workphone.FriendTalkNoticeMessage{}
  229. err := protojson.Unmarshal([]byte(msgStr), message)
  230. if err != nil {
  231. logx.Error("FriendTalkNotice protojson.Unmarshal err:", err)
  232. return err
  233. }
  234. logx.Info("收到消息:", message)
  235. chatMsg := &ChatMessage{}
  236. switch message.ContentType {
  237. case workphone.EnumContentType_Text:
  238. logx.Info("这是本文消息")
  239. chatMsg, err = w.TextToChatMessage(msgStr)
  240. if err != nil {
  241. logx.Error("FriendTalkNotice TextToChatMessage err:", err)
  242. return err
  243. }
  244. default:
  245. //logx.Info("不支持的消息类型")
  246. return nil
  247. }
  248. if chatMsg == nil {
  249. //logx.Info("chatMsg is nil,无需处理")
  250. return nil
  251. }
  252. wxInfo, err := w.GetWxInfo(chatMsg.UserId)
  253. if err != nil {
  254. logx.Error("FriendTalkNotice GetWxInfo err:", err)
  255. return err
  256. }
  257. //白名单检查
  258. if !w.CheckAllowOrBlockList(wxInfo, chatMsg) {
  259. return errors.New("不在白名单中")
  260. }
  261. chatCtx, err := w.GenerateRelevantContext(chatMsg)
  262. if err != nil {
  263. logx.Info("FriendTalkNotice GenerateRelevantContext err:", err)
  264. return err
  265. }
  266. logx.Info("处理后的 chatMsg:", chatMsg)
  267. w.Produce(w.svcCtx, chatCtx, chatMsg)
  268. return nil
  269. }
  270. func (w *WechatChannel) WeChatTalkToFriendHandle(msgStr string) error {
  271. //logx.Info("收到发送的消息:", msgStr)
  272. return nil
  273. }